1. 前言
线上服务很少会单机部署,它不满足互联网三高架构的需求。服务一旦挂了,高可用就无从谈起,另外Dubbo单机默认最大200的并发处理也不满足所谓的高并发、高性能。所以,作为一款优秀的分布式服务框架,Dubbo是支持集群容错的。
Dubbo整个集群容错层的实现在dubbo-cluster
模块,它包含很多组件,例如:Cluster、ClusterInvoker、Directory、LoadBalance等等,本文主要分析Cluster以及ClusterInvoker,其它组件会在后面的文章讨论。
2. Cluster
Cluster是Dubbo集群容错接口,接口定义很简单:
@SPI(Cluster.DEFAULT)
public interface Cluster {
String DEFAULT = FailoverCluster.NAME;
// 将一组Invokers聚合成一个Invoker
@Adaptive
<T> Invoker<T> join(Directory<T> directory) throws RpcException;
// 获取集群容错扩展点实现
static Cluster getCluster(String name) {
return getCluster(name, true);
}
static Cluster getCluster(String name, boolean wrap) {
if (StringUtils.isEmpty(name)) {
name = Cluster.DEFAULT;
}
return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(name, wrap);
}
}
可以看到,Cluster只有一个功能,那就是将一组Invoker聚合成一个具有集群容错能力的ClusterInvoker。Cluster本身不具备集群容错能力,它只会负责创建具备集群容错能力的ClusterInvoker。Cluster依赖Directory接口,Directory接口会提供可调用的服务列表,即一组Invoker,Cluster会从这一组Invoker里面选择一个最终的Invoker并发起调用,如果调用失败,则根据相应的集群容错策略做后续处理,例如服务重试。
Dubbo截止2.7.8版本,一共支持以下十种集群容错策略:
容错策略 | 说明 |
---|---|
Failover | 当出现失败时,会重试其他服务器,默认策略 |
Failfast | 快速失败,当请求失败后,快速返回异常结果,不做任何重试,适用于非幂等接口 |
Failsafe | 安全失败,出现异常直接忽略,不关心调用结果时适用 |
Fallback | 请求失败后,会自动记录在失败队列中,并由一个定时线程池定时重试 |
Forking | 同时调用多个相同的服务,只要其中一个返回,则立即返回结果 |
Broadcast | 广播调用所有可用的服务,任意一个节点报错则报错 |
Mock | 提供调用失败时,返回伪造的响应结果 |
Available | 不做负载均衡,遍历所有服务列表,找到第一个可用的节点直接发起调用 |
Mergeable | 自动把多个节点请求得到的结果进行合并 |
ZoneAware | 具备区域感知能力,优先调用同区域的服务。 |
其中,Failover是默认的策略,对应的类是FailoverCluster,我们已它为例。
public class FailoverCluster extends AbstractCluster {
public final static String NAME = "failover";
@Override
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<>(directory);
}
}
Cluster实现很简单,就是创建具有集群容错能力的ClusterInvoker,服务重试的逻辑全在FailoverClusterInvoker里。
3. ClusterInvoker
ClusterInvoker接口继承自Invoker,它在原来的基础上,增加了集群容错的能力。
public interface ClusterInvoker<T> extends Invoker<T> {
// 注册中心URL
URL getRegistryUrl();
// 获取服务目录
Directory<T> getDirectory();
}
ClusterInvoker采用的还是装饰者模式,它本身不具备远程服务调用的能力,依赖于基础的Invoker,在它之上做集群容错。基础的Invoker是由Directory提供的,以RegistryDirectory为例,它会去注册中心订阅所需的服务,然后将ProviderUrls转换成一组Invoker,ClusterInvoker会从这一组Invoker里面去做路由过滤,负载均衡,容错处理等操作。
ClusterInvoker使用了模板方法模式,基类AbstractClusterInvoker的invoke()
方法实现了一套算法骨架,流程依次是:Directory过滤服务列表,初始化LoadBalance,开始doInvoke调用。
public Result invoke(final Invocation invocation) throws RpcException {
// 确保服务没注销
checkWhetherDestroyed();
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
// 通过Directory过滤服务列表
List<Invoker<T>> invokers = list(invocation);
// 初始化 负载均衡
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
具体的容错策略就在子类的doInvoke()
方法里,我们还是以FailoverClusterInvoker为例,流程依次是:
- 获取服务重试次数。
- 创建List存储调用过的Invoker,用于重试时规避。
- 创建Set存储调用过的Provider,用于日志。
- 发起服务调用,负载均衡选出Invoker发起调用,成功则返回,失败则重试。
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
// 确保有服务可用
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
// 方法重试次数
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// 记录最后调用的异常
RpcException le = null;
// 记录已经调用过的Invoker,重试时规避
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
// 记录已经调用过的Provider,用于日志
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
checkInvokers(copyInvokers, invocation);
}
// 负载均衡
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
// 服务调用
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException();
}
负载均衡没有直接调用父类的
doSelect()
方法,是因为子类要处理粘滞连接等操作。
其他集群容错策略的实现就不一一分析了,感兴趣的同学自己去看下。
4. 总结
Cluster是集群容错接口,它的功能仅仅是将一组Invoker聚合成一个具备集群容错能力的ClusterInvoker。在2.7.8版本中,Dubbo内置了十种集群容错策略,默认策略是Failover,对应的类是FailoverCluster,它会创建FailoverClusterInvoker。
ClusterInvoker采用装饰者模式+模板方法模式,本身不具备远程调用的能力,依赖于Directory提供的基础Invokers,在它们基础之上去做集群容错。基类invoke()
方法实现了一套算法骨架,通过Directory过滤出可调用的服务,然后初始化LoadBalance,最终交给子类去做集群容错。