Dubbo集群容错之Cluster

1. 前言

线上服务很少会单机部署,它不满足互联网三高架构的需求。服务一旦挂了,高可用就无从谈起,另外Dubbo单机默认最大200的并发处理也不满足所谓的高并发、高性能。所以,作为一款优秀的分布式服务框架,Dubbo是支持集群容错的。

Dubbo整个集群容错层的实现在dubbo-cluster模块,它包含很多组件,例如:Cluster、ClusterInvoker、Directory、LoadBalance等等,本文主要分析Cluster以及ClusterInvoker,其它组件会在后面的文章讨论。

2. Cluster

Cluster是Dubbo集群容错接口,接口定义很简单:

@SPI(Cluster.DEFAULT)
public interface Cluster {
    String DEFAULT = FailoverCluster.NAME;

    // 将一组Invokers聚合成一个Invoker
    @Adaptive
    <T> Invoker<T> join(Directory<T> directory) throws RpcException;

    // 获取集群容错扩展点实现
    static Cluster getCluster(String name) {
        return getCluster(name, true);
    }

    static Cluster getCluster(String name, boolean wrap) {
        if (StringUtils.isEmpty(name)) {
            name = Cluster.DEFAULT;
        }
        return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(name, wrap);
    }
}

可以看到,Cluster只有一个功能,那就是将一组Invoker聚合成一个具有集群容错能力的ClusterInvoker。Cluster本身不具备集群容错能力,它只会负责创建具备集群容错能力的ClusterInvoker。Cluster依赖Directory接口,Directory接口会提供可调用的服务列表,即一组Invoker,Cluster会从这一组Invoker里面选择一个最终的Invoker并发起调用,如果调用失败,则根据相应的集群容错策略做后续处理,例如服务重试。

Dubbo截止2.7.8版本,一共支持以下十种集群容错策略:

容错策略 说明
Failover 当出现失败时,会重试其他服务器,默认策略
Failfast 快速失败,当请求失败后,快速返回异常结果,不做任何重试,适用于非幂等接口
Failsafe 安全失败,出现异常直接忽略,不关心调用结果时适用
Fallback 请求失败后,会自动记录在失败队列中,并由一个定时线程池定时重试
Forking 同时调用多个相同的服务,只要其中一个返回,则立即返回结果
Broadcast 广播调用所有可用的服务,任意一个节点报错则报错
Mock 提供调用失败时,返回伪造的响应结果
Available 不做负载均衡,遍历所有服务列表,找到第一个可用的节点直接发起调用
Mergeable 自动把多个节点请求得到的结果进行合并
ZoneAware 具备区域感知能力,优先调用同区域的服务。

其中,Failover是默认的策略,对应的类是FailoverCluster,我们已它为例。

public class FailoverCluster extends AbstractCluster {

    public final static String NAME = "failover";

    @Override
    public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<>(directory);
    }
}

Cluster实现很简单,就是创建具有集群容错能力的ClusterInvoker,服务重试的逻辑全在FailoverClusterInvoker里。

3. ClusterInvoker

ClusterInvoker接口继承自Invoker,它在原来的基础上,增加了集群容错的能力。

public interface ClusterInvoker<T> extends Invoker<T> {

    // 注册中心URL
    URL getRegistryUrl();

    // 获取服务目录
    Directory<T> getDirectory();
}

ClusterInvoker采用的还是装饰者模式,它本身不具备远程服务调用的能力,依赖于基础的Invoker,在它之上做集群容错。基础的Invoker是由Directory提供的,以RegistryDirectory为例,它会去注册中心订阅所需的服务,然后将ProviderUrls转换成一组Invoker,ClusterInvoker会从这一组Invoker里面去做路由过滤,负载均衡,容错处理等操作。

ClusterInvoker使用了模板方法模式,基类AbstractClusterInvoker的invoke()方法实现了一套算法骨架,流程依次是:Directory过滤服务列表,初始化LoadBalance,开始doInvoke调用。

public Result invoke(final Invocation invocation) throws RpcException {
    // 确保服务没注销
    checkWhetherDestroyed();
    Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
    }
    // 通过Directory过滤服务列表
    List<Invoker<T>> invokers = list(invocation);
    // 初始化 负载均衡
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}

具体的容错策略就在子类的doInvoke()方法里,我们还是以FailoverClusterInvoker为例,流程依次是:

  1. 获取服务重试次数。
  2. 创建List存储调用过的Invoker,用于重试时规避。
  3. 创建Set存储调用过的Provider,用于日志。
  4. 发起服务调用,负载均衡选出Invoker发起调用,成功则返回,失败则重试。
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyInvokers = invokers;
    // 确保有服务可用
    checkInvokers(copyInvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    // 方法重试次数
    int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // 记录最后调用的异常
    RpcException le = null;
    // 记录已经调用过的Invoker,重试时规避
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    // 记录已经调用过的Provider,用于日志
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        if (i > 0) {
            checkWhetherDestroyed();
            copyInvokers = list(invocation);
            checkInvokers(copyInvokers, invocation);
        }
        // 负载均衡
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            // 服务调用
            Result result = invoker.invoke(invocation);
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) {
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    throw new RpcException();
}

负载均衡没有直接调用父类的doSelect()方法,是因为子类要处理粘滞连接等操作。

其他集群容错策略的实现就不一一分析了,感兴趣的同学自己去看下。

4. 总结

Cluster是集群容错接口,它的功能仅仅是将一组Invoker聚合成一个具备集群容错能力的ClusterInvoker。在2.7.8版本中,Dubbo内置了十种集群容错策略,默认策略是Failover,对应的类是FailoverCluster,它会创建FailoverClusterInvoker。
ClusterInvoker采用装饰者模式+模板方法模式,本身不具备远程调用的能力,依赖于Directory提供的基础Invokers,在它们基础之上去做集群容错。基类invoke()方法实现了一套算法骨架,通过Directory过滤出可调用的服务,然后初始化LoadBalance,最终交给子类去做集群容错。

上一篇:微服务架构


下一篇:逐梦校友圈——冲刺day10