Ribbon是Netflix发布的开源项目,主要功能是提供客户端的软件负载均衡算法,将Netflix的中间层服务连接在一起。Ribbon客户端组件提供一系列完善的配置项如连接超时,重试等。简单的说,就是在配置文件中列出Load Balancer(简称LB)后面所有的机器,Ribbon会自动的帮助你基于某种规则(如简单轮询,随即连接等)去连接这些机器。我们也很容易使用Ribbon实现自定义的负载均衡算法。
说起负载均衡一般都会想到服务端的负载均衡,常用产品包括LBS硬件或云服务、Nginx等,都是耳熟能详的产品。
而Spring Cloud提供了让服务调用端具备负载均衡能力的Ribbon,通过和Eureka的紧密结合,不用在服务集群内再架设负载均衡服务,很大程度简化了服务集群内的架构。
具体也不想多写虚的介绍,反正哪里都能看得到相关的介绍。
直接开撸代码,通过代码来看Ribbon是如何实现的。
配置
详解:
1.RibbonAutoConfiguration配置生成RibbonLoadBalancerClient实例。
代码位置:
spring-cloud-netflix-core-1.3.5.RELEASE.jar
org.springframework.cloud.netflix.ribbon
RibbonAutoConfiguration.class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
@Configuration @ConditionalOnClass ({ IClient. class , RestTemplate. class , AsyncRestTemplate. class , Ribbon. class }) @RibbonClients @AutoConfigureAfter (name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration" ) @AutoConfigureBefore ({LoadBalancerAutoConfiguration. class , AsyncLoadBalancerAutoConfiguration. class }) @EnableConfigurationProperties (RibbonEagerLoadProperties. class ) public class RibbonAutoConfiguration { // 略 @Bean @ConditionalOnMissingBean (LoadBalancerClient. class ) public LoadBalancerClient loadBalancerClient() { return new RibbonLoadBalancerClient(springClientFactory()); } // 略 } |
先看配置条件项,RibbonAutoConfiguration配置必须在LoadBalancerAutoConfiguration配置前执行,因为在LoadBalancerAutoConfiguration配置中会使用RibbonLoadBalancerClient实例。
RibbonLoadBalancerClient继承自LoadBalancerClient接口,是负载均衡客户端,也是负载均衡策略的调用方。
2.LoadBalancerInterceptorConfig配置生成:
1).负载均衡拦截器LoadBalancerInterceptor实例
包含:
LoadBalancerClient实现类的RibbonLoadBalancerClient实例
负载均衡的请求创建工厂LoadBalancerRequestFactory:实例
2).RestTemplate自定义的RestTemplateCustomizer实例
代码位置:
spring-cloud-commons-1.2.4.RELEASE.jar
org.springframework.cloud.client.loadbalancer
LoadBalancerAutoConfiguration.class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
@Configuration @ConditionalOnClass (RestTemplate. class ) @ConditionalOnBean (LoadBalancerClient. class ) @EnableConfigurationProperties (LoadBalancerRetryProperties. class ) public class LoadBalancerAutoConfiguration { // 略 @Bean @ConditionalOnMissingBean public LoadBalancerRequestFactory loadBalancerRequestFactory( LoadBalancerClient loadBalancerClient) { return new LoadBalancerRequestFactory(loadBalancerClient, transformers); } @Configuration @ConditionalOnMissingClass ( "org.springframework.retry.support.RetryTemplate" ) static class LoadBalancerInterceptorConfig { @Bean public LoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) { return new LoadBalancerInterceptor(loadBalancerClient, requestFactory); } @Bean @ConditionalOnMissingBean public RestTemplateCustomizer restTemplateCustomizer( final LoadBalancerInterceptor loadBalancerInterceptor) { return new RestTemplateCustomizer() { @Override public void customize(RestTemplate restTemplate) { List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); } }; } } // 略 } |
先看配置条件项:
要求在项目环境中必须要有RestTemplate类。
要求必须要有LoadBalancerClient接口的实现类的实例,也就是上一步生成的RibbonLoadBalancerClient。
3.通过上面一步创建的RestTemplateCustomizer配置所有RestTemplate实例,就是将负载均衡拦截器设置给RestTemplate实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
@Configuration @ConditionalOnClass (RestTemplate. class ) @ConditionalOnBean (LoadBalancerClient. class ) @EnableConfigurationProperties (LoadBalancerRetryProperties. class ) public class LoadBalancerAutoConfiguration { // 略 @Bean public SmartInitializingSingleton loadBalancedRestTemplateInitializer( final List<RestTemplateCustomizer> customizers) { return new SmartInitializingSingleton() { @Override public void afterSingletonsInstantiated() { for (RestTemplate restTemplate : LoadBalancerAutoConfiguration. this .restTemplates) { for (RestTemplateCustomizer customizer : customizers) { customizer.customize(restTemplate); } } } }; } // 略 @Configuration @ConditionalOnMissingClass ( "org.springframework.retry.support.RetryTemplate" ) static class LoadBalancerInterceptorConfig { @Bean public LoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) { return new LoadBalancerInterceptor(loadBalancerClient, requestFactory); } @Bean @ConditionalOnMissingBean public RestTemplateCustomizer restTemplateCustomizer( final LoadBalancerInterceptor loadBalancerInterceptor) { return new RestTemplateCustomizer() { @Override public void customize(RestTemplate restTemplate) { List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); } }; } } // 略 } |
restTemplate.setInterceptors(list)这个地方就是注入负载均衡拦截器的地方LoadBalancerInterceptor。
从这个地方实际上也可以猜出来,RestTemplate可以通过注入的拦截器来构建相应的请求实现负载均衡。
也能看出来可以自定义拦截器实现其他目的。
4.RibbonClientConfiguration配置生成ZoneAwareLoadBalancer实例
代码位置:
spring-cloud-netflix-core-1.3.5.RELEASE.jar
org.springframework.cloud.netflix.ribbon
RibbonClientConfiguration.class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
@SuppressWarnings ( "deprecation" ) @Configuration @EnableConfigurationProperties //Order is important here, last should be the default, first should be optional // see https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653 @Import ({OkHttpRibbonConfiguration. class , RestClientRibbonConfiguration. class , HttpClientRibbonConfiguration. class }) public class RibbonClientConfiguration { // 略 @Bean @ConditionalOnMissingBean public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) { if ( this .propertiesFactory.isSet(ILoadBalancer. class , name)) { return this .propertiesFactory.get(ILoadBalancer. class , config, name); } return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList, serverListFilter, serverListUpdater); } // 略 } |
ZoneAwareLoadBalancer继承自ILoadBalancer接口,该接口有一个方法:
1 2 3 4 5 6 7 8 |
/** * Choose a server from load balancer. * * @param key An object that the load balancer may use to determine which server to return. null if * the load balancer does not use this parameter. * @return server chosen */ public Server chooseServer(Object key); |
ZoneAwareLoadBalancer就是一个具体的负载均衡实现类,也是默认的负载均衡类,通过对chooseServer方法的实现选取某个服务实例。
拦截&请求
1.使用RestTemplate进行Get、Post等各种请求,都是通过doExecute方法实现
代码位置:
spring-web-4.3.12.RELEASE.jar
org.springframework.web.client
RestTemplate.class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
public class RestTemplate extends InterceptingHttpAccessor implements RestOperations { // 略 protected <T> T doExecute(URI url, HttpMethod method, RequestCallback requestCallback, ResponseExtractor<T> responseExtractor) throws RestClientException { Assert.notNull(url, "'url' must not be null" ); Assert.notNull(method, "'method' must not be null" ); ClientHttpResponse response = null ; try { ClientHttpRequest request = createRequest(url, method); if (requestCallback != null ) { requestCallback.doWithRequest(request); } response = request.execute(); handleResponse(url, method, response); if (responseExtractor != null ) { return responseExtractor.extractData(response); } else { return null ; } } catch (IOException ex) { String resource = url.toString(); String query = url.getRawQuery(); resource = (query != null ? resource.substring( 0 , resource.indexOf( '?' )) : resource); throw new ResourceAccessException( "I/O error on " + method.name() + " request for \"" + resource + "\": " + ex.getMessage(), ex); } finally { if (response != null ) { response.close(); } } } // 略 } |
支持的各种http请求方法最终都是调用doExecute方法,该方法内调用创建方法创建请求实例,并执行请求得到响应对象。
2.生成请求实例创建工厂
上一步代码中,调用createRequest方法创建请求实例,这个方法是定义在父类中。
先整理出主要的继承关系:
createRequest方法实际是定义在HttpAccessor抽象类中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
public abstract class HttpAccessor { private ClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory(); public void setRequestFactory(ClientHttpRequestFactory requestFactory) { Assert.notNull(requestFactory, "ClientHttpRequestFactory must not be null" ); this .requestFactory = requestFactory; } public ClientHttpRequestFactory getRequestFactory() { return this .requestFactory; } protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException { ClientHttpRequest request = getRequestFactory().createRequest(url, method); if (logger.isDebugEnabled()) { logger.debug( "Created " + method.name() + " request for \"" + url + "\"" ); } return request; } } |
在createRequest方法中调用getRequestFactory方法获得请求实例创建工厂,实际上getRequestFactory并不是当前HttpAccessor类中定义的,而是在子类InterceptingHttpAccessor中定义的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public abstract class InterceptingHttpAccessor extends HttpAccessor { private List<ClientHttpRequestInterceptor> interceptors = new ArrayList<ClientHttpRequestInterceptor>(); public void setInterceptors(List<ClientHttpRequestInterceptor> interceptors) { this .interceptors = interceptors; } public List<ClientHttpRequestInterceptor> getInterceptors() { return interceptors; } @Override public ClientHttpRequestFactory getRequestFactory() { ClientHttpRequestFactory delegate = super .getRequestFactory(); if (!CollectionUtils.isEmpty(getInterceptors())) { return new InterceptingClientHttpRequestFactory(delegate, getInterceptors()); } else { return delegate; } } } |
在这里做了个小动作,首先还是通过HttpAccessor类创建并获得SimpleClientHttpRequestFactory工厂,这个工厂主要就是在没有拦截器的时候创建基本请求实例。
其次,在有拦截器注入的情况下,创建InterceptingClientHttpRequestFactory工厂,该工厂就是创建带拦截器的请求实例,因为注入了负载均衡拦截器,所以这里就从InterceptingClientHttpRequestFactory工厂创建。
3.通过工厂创建请求实例
创建实例就看工厂的createRequest方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public class InterceptingClientHttpRequestFactory extends AbstractClientHttpRequestFactoryWrapper { private final List<ClientHttpRequestInterceptor> interceptors; public InterceptingClientHttpRequestFactory(ClientHttpRequestFactory requestFactory, List<ClientHttpRequestInterceptor> interceptors) { super (requestFactory); this .interceptors = (interceptors != null ? interceptors : Collections.<ClientHttpRequestInterceptor>emptyList()); } @Override protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) { return new InterceptingClientHttpRequest(requestFactory, this .interceptors, uri, httpMethod); } } |
就是new了个InterceptingClientHttpRequest实例,并且把拦截器、基本请求实例创建工厂注进去。
4.请求实例调用配置阶段注入的负载均衡拦截器的拦截方法intercept
可从第1步看出,创建完请求实例后,通过执行请求实例的execute方法执行请求。
1 2 3 4 5 |
ClientHttpRequest request = createRequest(url, method); if (requestCallback != null ) { requestCallback.doWithRequest(request); } response = request.execute(); |
实际请求实例是InterceptingClientHttpRequest,execute实际是在它的父类中。
类定义位置:
spring-web-4.3.12.RELEASE.jar
org.springframework.http.client
InterceptingClientHttpRequest.class
看一下它们的继承关系。
在execute方法中实际调用了子类实现的executeInternal方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
public abstract class AbstractClientHttpRequest implements ClientHttpRequest { private final HttpHeaders headers = new HttpHeaders(); private boolean executed = false ; @Override public final HttpHeaders getHeaders() { return ( this .executed ? HttpHeaders.readOnlyHttpHeaders( this .headers) : this .headers); } @Override public final OutputStream getBody() throws IOException { assertNotExecuted(); return getBodyInternal( this .headers); } @Override public final ClientHttpResponse execute() throws IOException { assertNotExecuted(); ClientHttpResponse result = executeInternal( this .headers); this .executed = true ; return result; } protected void assertNotExecuted() { Assert.state(! this .executed, "ClientHttpRequest already executed" ); } protected abstract OutputStream getBodyInternal(HttpHeaders headers) throws IOException; protected abstract ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException; } |
其实就是InterceptingClientHttpRequest类的executeInternal方法,其中,又调用了一个执行器InterceptingRequestExecution的execute,通关判断如果有拦截器注入进来过,就调用拦截器的intercept方法。
这里的拦截器实际上就是在配置阶段注入进RestTemplate实例的负载均衡拦截器LoadBalancerInterceptor实例,可参考上面配置阶段的第2步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest { // 略 @Override protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte [] bufferedOutput) throws IOException { InterceptingRequestExecution requestExecution = new InterceptingRequestExecution(); return requestExecution.execute( this , bufferedOutput); } private class InterceptingRequestExecution implements ClientHttpRequestExecution { private final Iterator<ClientHttpRequestInterceptor> iterator; public InterceptingRequestExecution() { this .iterator = interceptors.iterator(); } @Override public ClientHttpResponse execute(HttpRequest request, byte [] body) throws IOException { if ( this .iterator.hasNext()) { ClientHttpRequestInterceptor nextInterceptor = this .iterator.next(); return nextInterceptor.intercept(request, body, this ); } else { ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), request.getMethod()); for (Map.Entry<String, List<String>> entry : request.getHeaders().entrySet()) { List<String> values = entry.getValue(); for (String value : values) { delegate.getHeaders().add(entry.getKey(), value); } } if (body.length > 0 ) { StreamUtils.copy(body, delegate.getBody()); } return delegate.execute(); } } } } |
5.负载均衡拦截器调用负载均衡客户端
在负载均衡拦截器LoadBalancerInterceptor类的intercept方法中,又调用了负载均衡客户端LoadBalancerClient实现类的execute方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor { private LoadBalancerClient loadBalancer; private LoadBalancerRequestFactory requestFactory; public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) { this .loadBalancer = loadBalancer; this .requestFactory = requestFactory; } public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) { // for backwards compatibility this (loadBalancer, new LoadBalancerRequestFactory(loadBalancer)); } @Override public ClientHttpResponse intercept( final HttpRequest request, final byte [] body, final ClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI(); String serviceName = originalUri.getHost(); Assert.state(serviceName != null , "Request URI does not contain a valid hostname: " + originalUri); return this .loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution)); } } |
在配置阶段的第1步,可以看到实现类是RibbonLoadBalancerClient。
6.负载均衡客户端调用负载均衡策略选取目标服务实例并发起请求
在RibbonLoadBalancerClient的第一个execute方法以及getServer方法中可以看到,实际上是通过ILoadBalancer的负载均衡器实现类作的chooseServer方法选取一个服务,交给接下来的请求对象发起一个请求。
这里的负载均衡实现类默认是ZoneAwareLoadBalancer区域感知负载均衡器实例,其内部通过均衡策略选择一个服务。
ZoneAwareLoadBalancer的创建可以参考配置阶段的第4步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
public class RibbonLoadBalancerClient implements LoadBalancerClient { @Override public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException { ILoadBalancer loadBalancer = getLoadBalancer(serviceId); Server server = getServer(loadBalancer); if (server == null ) { throw new IllegalStateException( "No instances available for " + serviceId); } RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); return execute(serviceId, ribbonServer, request); } @Override public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException { Server server = null ; if (serviceInstance instanceof RibbonServer) { server = ((RibbonServer)serviceInstance).getServer(); } if (server == null ) { throw new IllegalStateException( "No instances available for " + serviceId); } RibbonLoadBalancerContext context = this .clientFactory .getLoadBalancerContext(serviceId); RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server); try { T returnVal = request.apply(serviceInstance); statsRecorder.recordStats(returnVal); return returnVal; } // catch IOException and rethrow so RestTemplate behaves correctly catch (IOException ex) { statsRecorder.recordStats(ex); throw ex; } catch (Exception ex) { statsRecorder.recordStats(ex); ReflectionUtils.rethrowRuntimeException(ex); } return null ; } // 略 protected Server getServer(ILoadBalancer loadBalancer) { if (loadBalancer == null ) { return null ; } return loadBalancer.chooseServer( "default" ); // TODO: better handling of key } protected ILoadBalancer getLoadBalancer(String serviceId) { return this .clientFactory.getLoadBalancer(serviceId); } public static class RibbonServer implements ServiceInstance { private final String serviceId; private final Server server; private final boolean secure; private Map<String, String> metadata; public RibbonServer(String serviceId, Server server) { this (serviceId, server, false , Collections.<String, String> emptyMap()); } public RibbonServer(String serviceId, Server server, boolean secure, Map<String, String> metadata) { this .serviceId = serviceId; this .server = server; this .secure = secure; this .metadata = metadata; } // 略 } } |
代码撸完,总结下。
普通使用RestTemplate请求其他服务时,内部使用的就是常规的http请求实例发送请求。
为RestTemplate增加了@LoanBalanced 注解后,实际上通过配置,为RestTemplate注入负载均衡拦截器,让负载均衡器选择根据其对应的策略选择合适的服务后,再发送请求。
转载:浅谈Spring Cloud Ribbon的原理