浅谈Spring Cloud Ribbon的原理

Ribbon是Netflix发布的开源项目,主要功能是提供客户端的软件负载均衡算法,将Netflix的中间层服务连接在一起。Ribbon客户端组件提供一系列完善的配置项如连接超时,重试等。简单的说,就是在配置文件中列出Load Balancer(简称LB)后面所有的机器,Ribbon会自动的帮助你基于某种规则(如简单轮询,随即连接等)去连接这些机器。我们也很容易使用Ribbon实现自定义的负载均衡算法。

说起负载均衡一般都会想到服务端的负载均衡,常用产品包括LBS硬件或云服务、Nginx等,都是耳熟能详的产品。

而Spring Cloud提供了让服务调用端具备负载均衡能力的Ribbon,通过和Eureka的紧密结合,不用在服务集群内再架设负载均衡服务,很大程度简化了服务集群内的架构。

具体也不想多写虚的介绍,反正哪里都能看得到相关的介绍。

直接开撸代码,通过代码来看Ribbon是如何实现的。

配置

浅谈Spring Cloud Ribbon的原理

详解:

1.RibbonAutoConfiguration配置生成RibbonLoadBalancerClient实例。

代码位置:

spring-cloud-netflix-core-1.3.5.RELEASE.jar

org.springframework.cloud.netflix.ribbon

RibbonAutoConfiguration.class

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

@Configuration

@ConditionalOnClass({ IClient.class, RestTemplate.class, AsyncRestTemplate.class, Ribbon.class})

@RibbonClients

@AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")

@AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})

@EnableConfigurationProperties(RibbonEagerLoadProperties.class)

public class RibbonAutoConfiguration {

 

 // 略

 

 @Bean

 @ConditionalOnMissingBean(LoadBalancerClient.class)

 public LoadBalancerClient loadBalancerClient() {

  return new RibbonLoadBalancerClient(springClientFactory());

 }

  // 略

}

先看配置条件项,RibbonAutoConfiguration配置必须在LoadBalancerAutoConfiguration配置前执行,因为在LoadBalancerAutoConfiguration配置中会使用RibbonLoadBalancerClient实例。

RibbonLoadBalancerClient继承自LoadBalancerClient接口,是负载均衡客户端,也是负载均衡策略的调用方。

2.LoadBalancerInterceptorConfig配置生成:

1).负载均衡拦截器LoadBalancerInterceptor实例

包含:

LoadBalancerClient实现类的RibbonLoadBalancerClient实例

负载均衡的请求创建工厂LoadBalancerRequestFactory:实例

2).RestTemplate自定义的RestTemplateCustomizer实例

代码位置:

spring-cloud-commons-1.2.4.RELEASE.jar

org.springframework.cloud.client.loadbalancer

LoadBalancerAutoConfiguration.class

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

@Configuration

@ConditionalOnClass(RestTemplate.class)

@ConditionalOnBean(LoadBalancerClient.class)

@EnableConfigurationProperties(LoadBalancerRetryProperties.class)

public class LoadBalancerAutoConfiguration {

 // 略

 @Bean

 @ConditionalOnMissingBean

 public LoadBalancerRequestFactory loadBalancerRequestFactory(

   LoadBalancerClient loadBalancerClient) {

  return new LoadBalancerRequestFactory(loadBalancerClient, transformers);

 }

 

 @Configuration

 @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")

 static class LoadBalancerInterceptorConfig {

  @Bean

  public LoadBalancerInterceptor ribbonInterceptor(

    LoadBalancerClient loadBalancerClient,

    LoadBalancerRequestFactory requestFactory) {

   return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);

  }

 

  @Bean

  @ConditionalOnMissingBean

  public RestTemplateCustomizer restTemplateCustomizer(

    final LoadBalancerInterceptor loadBalancerInterceptor) {

   return new RestTemplateCustomizer() {

    @Override

    public void customize(RestTemplate restTemplate) {

     List<ClientHttpRequestInterceptor> list = new ArrayList<>(

       restTemplate.getInterceptors());

     list.add(loadBalancerInterceptor);

     restTemplate.setInterceptors(list);

    }

   };

  }

 }

 // 略

}

先看配置条件项:

要求在项目环境中必须要有RestTemplate类。

要求必须要有LoadBalancerClient接口的实现类的实例,也就是上一步生成的RibbonLoadBalancerClient。

3.通过上面一步创建的RestTemplateCustomizer配置所有RestTemplate实例,就是将负载均衡拦截器设置给RestTemplate实例。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

@Configuration

@ConditionalOnClass(RestTemplate.class)

@ConditionalOnBean(LoadBalancerClient.class)

@EnableConfigurationProperties(LoadBalancerRetryProperties.class)

public class LoadBalancerAutoConfiguration {

 // 略

 

 @Bean

 public SmartInitializingSingleton loadBalancedRestTemplateInitializer(

   final List<RestTemplateCustomizer> customizers) {

  return new SmartInitializingSingleton() {

   @Override

   public void afterSingletonsInstantiated() {

    for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {

     for (RestTemplateCustomizer customizer : customizers) {

      customizer.customize(restTemplate);

     }

    }

   }

  };

 }

 

 // 略

 @Configuration

 @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")

 static class LoadBalancerInterceptorConfig {

  @Bean

  public LoadBalancerInterceptor ribbonInterceptor(

    LoadBalancerClient loadBalancerClient,

    LoadBalancerRequestFactory requestFactory) {

   return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);

  }

 

  @Bean

  @ConditionalOnMissingBean

  public RestTemplateCustomizer restTemplateCustomizer(

    final LoadBalancerInterceptor loadBalancerInterceptor) {

   return new RestTemplateCustomizer() {

    @Override

    public void customize(RestTemplate restTemplate) {

     List<ClientHttpRequestInterceptor> list = new ArrayList<>(

       restTemplate.getInterceptors());

     list.add(loadBalancerInterceptor);

     restTemplate.setInterceptors(list);

    }

   };

  }

 }

 // 略

}

restTemplate.setInterceptors(list)这个地方就是注入负载均衡拦截器的地方LoadBalancerInterceptor。

从这个地方实际上也可以猜出来,RestTemplate可以通过注入的拦截器来构建相应的请求实现负载均衡。

也能看出来可以自定义拦截器实现其他目的。

4.RibbonClientConfiguration配置生成ZoneAwareLoadBalancer实例

代码位置:

spring-cloud-netflix-core-1.3.5.RELEASE.jar

org.springframework.cloud.netflix.ribbon

RibbonClientConfiguration.class

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

@SuppressWarnings("deprecation")

@Configuration

@EnableConfigurationProperties

//Order is important here, last should be the default, first should be optional

// see https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653

@Import({OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class})

public class RibbonClientConfiguration {

 // 略

 @Bean

 @ConditionalOnMissingBean

 public ILoadBalancer ribbonLoadBalancer(IClientConfig config,

   ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,

   IRule rule, IPing ping, ServerListUpdater serverListUpdater) {

  if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {

   return this.propertiesFactory.get(ILoadBalancer.class, config, name);

  }

  return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,

    serverListFilter, serverListUpdater);

 }

 

 // 略

}

ZoneAwareLoadBalancer继承自ILoadBalancer接口,该接口有一个方法:

1

2

3

4

5

6

7

8

/**

 * Choose a server from load balancer.

 *

 * @param key An object that the load balancer may use to determine which server to return. null if

 *   the load balancer does not use this parameter.

 * @return server chosen

 */

public Server chooseServer(Object key);

ZoneAwareLoadBalancer就是一个具体的负载均衡实现类,也是默认的负载均衡类,通过对chooseServer方法的实现选取某个服务实例。

拦截&请求

浅谈Spring Cloud Ribbon的原理

1.使用RestTemplate进行Get、Post等各种请求,都是通过doExecute方法实现

代码位置: 
spring-web-4.3.12.RELEASE.jar

org.springframework.web.client

RestTemplate.class

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

public class RestTemplate extends InterceptingHttpAccessor implements RestOperations {

 

 // 略

 

 protected <T> T doExecute(URI url, HttpMethod method, RequestCallback requestCallback,

   ResponseExtractor<T> responseExtractor) throws RestClientException {

 

  Assert.notNull(url, "'url' must not be null");

  Assert.notNull(method, "'method' must not be null");

  ClientHttpResponse response = null;

  try {

   ClientHttpRequest request = createRequest(url, method);

   if (requestCallback != null) {

    requestCallback.doWithRequest(request);

   }

   response = request.execute();

   handleResponse(url, method, response);

   if (responseExtractor != null) {

    return responseExtractor.extractData(response);

   }

   else {

    return null;

   }

  }

  catch (IOException ex) {

   String resource = url.toString();

   String query = url.getRawQuery();

   resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);

   throw new ResourceAccessException("I/O error on " + method.name() +

     " request for \"" + resource + "\": " + ex.getMessage(), ex);

  }

  finally {

   if (response != null) {

    response.close();

   }

  }

 }

 

 // 略

 

}

支持的各种http请求方法最终都是调用doExecute方法,该方法内调用创建方法创建请求实例,并执行请求得到响应对象。

2.生成请求实例创建工厂

上一步代码中,调用createRequest方法创建请求实例,这个方法是定义在父类中。

先整理出主要的继承关系:

浅谈Spring Cloud Ribbon的原理

createRequest方法实际是定义在HttpAccessor抽象类中。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

public abstract class HttpAccessor {

 private ClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();

 public void setRequestFactory(ClientHttpRequestFactory requestFactory) {

  Assert.notNull(requestFactory, "ClientHttpRequestFactory must not be null");

  this.requestFactory = requestFactory;

 }

 public ClientHttpRequestFactory getRequestFactory() {

  return this.requestFactory;

 }

 protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {

  ClientHttpRequest request = getRequestFactory().createRequest(url, method);

  if (logger.isDebugEnabled()) {

   logger.debug("Created " + method.name() + " request for \"" + url + "\"");

  }

  return request;

 }

}

在createRequest方法中调用getRequestFactory方法获得请求实例创建工厂,实际上getRequestFactory并不是当前HttpAccessor类中定义的,而是在子类InterceptingHttpAccessor中定义的。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

public abstract class InterceptingHttpAccessor extends HttpAccessor {

 

 private List<ClientHttpRequestInterceptor> interceptors = new ArrayList<ClientHttpRequestInterceptor>();

 

 public void setInterceptors(List<ClientHttpRequestInterceptor> interceptors) {

  this.interceptors = interceptors;

 }

 

 public List<ClientHttpRequestInterceptor> getInterceptors() {

  return interceptors;

 }

 

 @Override

 public ClientHttpRequestFactory getRequestFactory() {

  ClientHttpRequestFactory delegate = super.getRequestFactory();

  if (!CollectionUtils.isEmpty(getInterceptors())) {

   return new InterceptingClientHttpRequestFactory(delegate, getInterceptors());

  }

  else {

   return delegate;

  }

 }

}

在这里做了个小动作,首先还是通过HttpAccessor类创建并获得SimpleClientHttpRequestFactory工厂,这个工厂主要就是在没有拦截器的时候创建基本请求实例。

其次,在有拦截器注入的情况下,创建InterceptingClientHttpRequestFactory工厂,该工厂就是创建带拦截器的请求实例,因为注入了负载均衡拦截器,所以这里就从InterceptingClientHttpRequestFactory工厂创建。

3.通过工厂创建请求实例

创建实例就看工厂的createRequest方法。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

public class InterceptingClientHttpRequestFactory extends AbstractClientHttpRequestFactoryWrapper {

 

 private final List<ClientHttpRequestInterceptor> interceptors;

 

 public InterceptingClientHttpRequestFactory(ClientHttpRequestFactory requestFactory,

   List<ClientHttpRequestInterceptor> interceptors) {

 

  super(requestFactory);

  this.interceptors = (interceptors != null ? interceptors : Collections.<ClientHttpRequestInterceptor>emptyList());

 }

 

 

 @Override

 protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {

  return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);

 }

 

}

就是new了个InterceptingClientHttpRequest实例,并且把拦截器、基本请求实例创建工厂注进去。

4.请求实例调用配置阶段注入的负载均衡拦截器的拦截方法intercept

可从第1步看出,创建完请求实例后,通过执行请求实例的execute方法执行请求。

1

2

3

4

5

ClientHttpRequest request = createRequest(url, method);

if (requestCallback != null) {

 requestCallback.doWithRequest(request);

}

response = request.execute();

实际请求实例是InterceptingClientHttpRequest,execute实际是在它的父类中。

类定义位置:

spring-web-4.3.12.RELEASE.jar

org.springframework.http.client

InterceptingClientHttpRequest.class

看一下它们的继承关系。

浅谈Spring Cloud Ribbon的原理

在execute方法中实际调用了子类实现的executeInternal方法。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

public abstract class AbstractClientHttpRequest implements ClientHttpRequest {

 

 private final HttpHeaders headers = new HttpHeaders();

 

 private boolean executed = false;

 

 @Override

 public final HttpHeaders getHeaders() {

  return (this.executed ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers);

 }

 

 @Override

 public final OutputStream getBody() throws IOException {

  assertNotExecuted();

  return getBodyInternal(this.headers);

 }

 

 @Override

 public final ClientHttpResponse execute() throws IOException {

  assertNotExecuted();

  ClientHttpResponse result = executeInternal(this.headers);

  this.executed = true;

  return result;

 }

 

 protected void assertNotExecuted() {

  Assert.state(!this.executed, "ClientHttpRequest already executed");

 }

 

 protected abstract OutputStream getBodyInternal(HttpHeaders headers) throws IOException;

 

 protected abstract ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException;

 

}

其实就是InterceptingClientHttpRequest类的executeInternal方法,其中,又调用了一个执行器InterceptingRequestExecution的execute,通关判断如果有拦截器注入进来过,就调用拦截器的intercept方法。

这里的拦截器实际上就是在配置阶段注入进RestTemplate实例的负载均衡拦截器LoadBalancerInterceptor实例,可参考上面配置阶段的第2步。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {

 

 // 略

 

 @Override

 protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {

  InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();

  return requestExecution.execute(this, bufferedOutput);

 }

 

 

 private class InterceptingRequestExecution implements ClientHttpRequestExecution {

 

  private final Iterator<ClientHttpRequestInterceptor> iterator;

 

  public InterceptingRequestExecution() {

   this.iterator = interceptors.iterator();

  }

 

  @Override

  public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {

   if (this.iterator.hasNext()) {

    ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();

    return nextInterceptor.intercept(request, body, this);

   }

   else {

    ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), request.getMethod());

    for (Map.Entry<String, List<String>> entry : request.getHeaders().entrySet()) {

     List<String> values = entry.getValue();

     for (String value : values) {

      delegate.getHeaders().add(entry.getKey(), value);

     }

    }

    if (body.length > 0) {

     StreamUtils.copy(body, delegate.getBody());

    }

    return delegate.execute();

   }

  }

 }

 

}

5.负载均衡拦截器调用负载均衡客户端

在负载均衡拦截器LoadBalancerInterceptor类的intercept方法中,又调用了负载均衡客户端LoadBalancerClient实现类的execute方法。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

 

 private LoadBalancerClient loadBalancer;

 private LoadBalancerRequestFactory requestFactory;

 

 public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {

  this.loadBalancer = loadBalancer;

  this.requestFactory = requestFactory;

 }

 

 public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {

  // for backwards compatibility

  this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));

 }

 

 @Override

 public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,

   final ClientHttpRequestExecution execution) throws IOException {

  final URI originalUri = request.getURI();

  String serviceName = originalUri.getHost();

  Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);

  return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));

 }

}

在配置阶段的第1步,可以看到实现类是RibbonLoadBalancerClient。

6.负载均衡客户端调用负载均衡策略选取目标服务实例并发起请求

在RibbonLoadBalancerClient的第一个execute方法以及getServer方法中可以看到,实际上是通过ILoadBalancer的负载均衡器实现类作的chooseServer方法选取一个服务,交给接下来的请求对象发起一个请求。

这里的负载均衡实现类默认是ZoneAwareLoadBalancer区域感知负载均衡器实例,其内部通过均衡策略选择一个服务。

ZoneAwareLoadBalancer的创建可以参考配置阶段的第4步。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

public class RibbonLoadBalancerClient implements LoadBalancerClient {

 @Override

 public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {

  ILoadBalancer loadBalancer = getLoadBalancer(serviceId);

  Server server = getServer(loadBalancer);

  if (server == null) {

   throw new IllegalStateException("No instances available for " + serviceId);

  }

  RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,

    serviceId), serverIntrospector(serviceId).getMetadata(server));

 

  return execute(serviceId, ribbonServer, request);

 }

 

 @Override

 public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {

  Server server = null;

  if(serviceInstance instanceof RibbonServer) {

   server = ((RibbonServer)serviceInstance).getServer();

  }

  if (server == null) {

   throw new IllegalStateException("No instances available for " + serviceId);

  }

 

  RibbonLoadBalancerContext context = this.clientFactory

    .getLoadBalancerContext(serviceId);

  RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

 

  try {

   T returnVal = request.apply(serviceInstance);

   statsRecorder.recordStats(returnVal);

   return returnVal;

  }

  // catch IOException and rethrow so RestTemplate behaves correctly

  catch (IOException ex) {

   statsRecorder.recordStats(ex);

   throw ex;

  }

  catch (Exception ex) {

   statsRecorder.recordStats(ex);

   ReflectionUtils.rethrowRuntimeException(ex);

  }

  return null;

 }

   

 // 略

 

 protected Server getServer(ILoadBalancer loadBalancer) {

  if (loadBalancer == null) {

   return null;

  }

  return loadBalancer.chooseServer("default"); // TODO: better handling of key

 }

 

 protected ILoadBalancer getLoadBalancer(String serviceId) {

  return this.clientFactory.getLoadBalancer(serviceId);

 }

 

 public static class RibbonServer implements ServiceInstance {

  private final String serviceId;

  private final Server server;

  private final boolean secure;

  private Map<String, String> metadata;

 

  public RibbonServer(String serviceId, Server server) {

   this(serviceId, server, false, Collections.<String, String> emptyMap());

  }

 

  public RibbonServer(String serviceId, Server server, boolean secure,

    Map<String, String> metadata) {

   this.serviceId = serviceId;

   this.server = server;

   this.secure = secure;

   this.metadata = metadata;

  }

 

  // 略

 }

 

}

 

代码撸完,总结下。

普通使用RestTemplate请求其他服务时,内部使用的就是常规的http请求实例发送请求。

为RestTemplate增加了@LoanBalanced 注解后,实际上通过配置,为RestTemplate注入负载均衡拦截器,让负载均衡器选择根据其对应的策略选择合适的服务后,再发送请求。

转载:浅谈Spring Cloud Ribbon的原理

 

上一篇:SpringCloud之Ribbon负载均衡及Feign消费者调用服务


下一篇:SpringCloud之Ribbon负载均衡