dubbo系列十、dubbo异步调用

dubbo异步调用

1.dubbo异步调用配置

provider端:

对整个provider端所有服务和方法进行异步 dubbo.provider.async=true

对某个具体服务所有方法异步 @Service(async = true)

对服务的某些具体方法异步 @Service(methods={@Method(name="方法名1",async = true), @Method(name="方法名2",async = true)})

consumer端:

1.对整个consumer端所有服务和方法进行异步 dubbo.reference.async=true。很少这样使用

2.对某个具体引用服务所有方法异步 @Reference(async = true)。使用较少

3.对引用服务的某些具体方法异步 @Reference(methods={@Method(name="方法名1",async = true), @Method(name="方法名2",async = true)})。dubbo常用异步配置

4.在consumer端调用内设置RpcContext.getContext().setAttachment(Constants.ASYNC_KEY, "true"); 优先级最高

常用使用规则:

consumer端覆盖provider端配置,方法级别优先。

通常不会在provider端设置异步,都是在consumer端进行异步设置。具体代码如下

@Reference(retries = 0, timeout = 2000, methods = {@Method(name="findProduct",async = true)})
private ProductService productService;


@Override
public Result<ProductVO> getProduct(ProductDTO dto) {
    Result<ProductVO> result = productService.findProduct(dto);//dubbo异步调用,此时输出result是null
    Future<Object> future = RpcContext.getContext().getFuture();//获取异步执行结果Future
    //do othder something
    try {
        result = (Result<ProductVO>) future.get();//获取具体的异步执行结果
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    return result;
}

2.dubbo异步调用分析

2.1.异步调用分析

dubbo异步是在DubboInvoker(AbstractInvoker)内实现,具体代码如下

//com.alibaba.dubbo.rpc.protocol.AbstractInvoker.invoke(Invocation)
public Result invoke(Invocation inv) throws RpcException {
    // if invoker is destroyed due to address refresh from registry, let‘s allow the current invoke to proceed
    if (destroyed.get()) {
        logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                    + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
    }

    RpcInvocation invocation = (RpcInvocation) inv;
    invocation.setInvoker(this);
    if (attachment != null && attachment.size() > 0) {
        invocation.addAttachmentsIfAbsent(attachment);
    }
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        
        invocation.addAttachments(contextAttachments);//把RpcContext的隐式参数保存到RpcInvocation.attachments。如果在上下文设置了async=true就说明使用一部
    }
    if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {//url上有异步标识${方法名}.async=true  或者 default.async=true
        invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());//设置异步标识到RpcInvocation.attachments
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);


    try {
        return doInvoke(invocation);//DubboInvoker.doInvoke(Invocation)
    } catch (InvocationTargetException e) { // biz exception
        //其它忽略
    }
}

就是保存异步标识到RpcInvocation.attachments,异步标识的来源:1. RpcContext.getContext().setAttachment(Constants.ASYNC_KEY, "true"); 2. url上有${方法名}.async=true

//com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker.doInvoke(Invocation)
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);//url上有async=true,认为是异步
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//url上有return=false or 隐式参数有return=false,认为是onway,不需要响应
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) {//dubbo异步调用
            ResponseFuture future = currentClient.request(inv, timeout);//网络调用
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));//隐式参数存放FutureAdapter,封装了ResponseFuture,因此consumer端需要RpcContext.getContext().getFuture()获取Future,然后Future.get()阻塞获取执行结果
            return new RpcResult();//返回RpcResult,其value是null,这就是异步调用获取的执行结果是null的原因
        } else {//dubbo同步调用
            RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get();//阻塞获取执行结果
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

从上面代码发现,异步和同步区别就在于是否直接阻塞获取执行结果而已。异步调用后,业务上可以做一些其它事情,然后再获取异步执行结果。获取执行结果,异步和同步都一样,本质都是com.alibaba.dubbo.remoting.exchange.ResponseFuture.get()。异步调用是把ResponseFuture方到了dubbo上下文,需要客户端手工显示的获取ResponseFuture,接着再get()获取执行结果。

以前时候(dubbo2.5),dubbo异步调用还有个坑,比如A->B--->C,其中A调用B是异步,B调用C是同步,但是实际上B调用C也会异步,因为异步标识被传递了。这个问题在dubbo2.6是不存在的,可以看ContextFilter内,把隐式参数async清除了。

2.2.异步调用判断和异步标识来源

接着看异步的判断:

异步的判断RpcUtils.isAsync(URL, Invocation)

//com.alibaba.dubbo.rpc.support.RpcUtils.isAsync(URL, Invocation)
public static boolean isAsync(URL url, Invocation inv) {
    boolean isAsync;
    if (Boolean.TRUE.toString().equals(inv.getAttachment(Constants.ASYNC_KEY))) {//RpcInvocation.attachments有async=true
        isAsync = true;
    } else {
        isAsync = url.getMethodParameter(getMethodName(inv), Constants.ASYNC_KEY, false);//url上有异步标识${方法名}.async=true  或者 default.async=true
    }
    return isAsync;
}

异步标识的来源:

  1. RpcContext.getContext().setAttachment(Constants.ASYNC_KEY, "true");
  2. url上有${方法名}.async=true

那么url上有异步标识是通过@Reference(async = true) 或 @Reference(methods={@Method(name="方法名1",async = true), @Method(name="方法名2",async = true)}) 配置

@Reference(async = true)

dubbo系列十、dubbo异步调用

@Reference(methods={@Method(name="方法名1",async = true), @Method(name="方法名2",async = true)}) 配置 生成的url例子

dubbo系列十、dubbo异步调用

@Method这样设置异步,具体是在com.alibaba.dubbo.config.ReferenceConfig.init()

dubbo系列十、dubbo异步调用

3.dubbo异步调用图解

dubbo系列十、dubbo异步调用

步骤1. consumer端业务线程调用dubbo引用方法,由IO线程发起网络通信,发送请求给provider

步骤2. netty nio网络通信,异步

步骤3. 设置ResponseFuture到RpcContext上。和步骤2基本是同时进行

步骤4. 从RpcContext获取ResponseFuture。然后consumer端进行其它业务逻辑处理

步骤5. ResponseFuture 阻塞获取provider端执行结果。其中步骤6、7可能执行结果早于步骤5,

步骤6. provider端处理完毕,响应执行结果

步骤7. 设置provider端执行结果到ResponseFuture,并唤醒ResponseFuture

4.总结

dubbo异步是基于NIO的非阻塞实现并行调用,客户端不需要启动多线程即可完成并行调用多个远程服务,相对多线程开销较少。

dubbo异步调用就是请求发送出去,然后业务做一些其它处理,最后再获取异步执行结果。

dubbo异步调用使用场景:

1.用于耗时久的请求调用,且下步业务处理,不依赖这个异步调用结果

2.用于不重要的操作,比如记录用户行为等

dubbo异步在平时工作中使用较少,基本还是同步为主,真要到了dubbo同步无法提高吞吐量和解耦,通常需要改变架构设计了,通常会使用mq进行异步消峰解耦。

dubbo系列十、dubbo异步调用

上一篇:[SAA + SAP] 28. Monitoring


下一篇:Train Wreck