dubbo异步调用
1.dubbo异步调用配置
provider端:
对整个provider端所有服务和方法进行异步 dubbo.provider.async=true
对某个具体服务所有方法异步 @Service(async = true)
对服务的某些具体方法异步 @Service(methods={@Method(name="方法名1",async = true), @Method(name="方法名2",async = true)})
consumer端:
1.对整个consumer端所有服务和方法进行异步 dubbo.reference.async=true。很少这样使用
2.对某个具体引用服务所有方法异步 @Reference(async = true)。使用较少
3.对引用服务的某些具体方法异步 @Reference(methods={@Method(name="方法名1",async = true), @Method(name="方法名2",async = true)})。dubbo常用异步配置
4.在consumer端调用内设置RpcContext.getContext().setAttachment(Constants.ASYNC_KEY, "true"); 优先级最高
常用使用规则:
consumer端覆盖provider端配置,方法级别优先。
通常不会在provider端设置异步,都是在consumer端进行异步设置。具体代码如下
@Reference(retries = 0, timeout = 2000, methods = {@Method(name="findProduct",async = true)})
private ProductService productService;
@Override
public Result<ProductVO> getProduct(ProductDTO dto) {
Result<ProductVO> result = productService.findProduct(dto);//dubbo异步调用,此时输出result是null
Future<Object> future = RpcContext.getContext().getFuture();//获取异步执行结果Future
//do othder something
try {
result = (Result<ProductVO>) future.get();//获取具体的异步执行结果
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return result;
}
2.dubbo异步调用分析
2.1.异步调用分析
dubbo异步是在DubboInvoker(AbstractInvoker)内实现,具体代码如下
//com.alibaba.dubbo.rpc.protocol.AbstractInvoker.invoke(Invocation)
public Result invoke(Invocation inv) throws RpcException {
// if invoker is destroyed due to address refresh from registry, let‘s allow the current invoke to proceed
if (destroyed.get()) {
logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
+ ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
}
RpcInvocation invocation = (RpcInvocation) inv;
invocation.setInvoker(this);
if (attachment != null && attachment.size() > 0) {
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
invocation.addAttachments(contextAttachments);//把RpcContext的隐式参数保存到RpcInvocation.attachments。如果在上下文设置了async=true就说明使用一部
}
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {//url上有异步标识${方法名}.async=true 或者 default.async=true
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());//设置异步标识到RpcInvocation.attachments
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try {
return doInvoke(invocation);//DubboInvoker.doInvoke(Invocation)
} catch (InvocationTargetException e) { // biz exception
//其它忽略
}
}
就是保存异步标识到RpcInvocation.attachments,异步标识的来源:1. RpcContext.getContext().setAttachment(Constants.ASYNC_KEY, "true"); 2. url上有${方法名}.async=true
//com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker.doInvoke(Invocation)
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);//url上有async=true,认为是异步
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//url上有return=false or 隐式参数有return=false,认为是onway,不需要响应
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {//dubbo异步调用
ResponseFuture future = currentClient.request(inv, timeout);//网络调用
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));//隐式参数存放FutureAdapter,封装了ResponseFuture,因此consumer端需要RpcContext.getContext().getFuture()获取Future,然后Future.get()阻塞获取执行结果
return new RpcResult();//返回RpcResult,其value是null,这就是异步调用获取的执行结果是null的原因
} else {//dubbo同步调用
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();//阻塞获取执行结果
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
从上面代码发现,异步和同步区别就在于是否直接阻塞获取执行结果而已。异步调用后,业务上可以做一些其它事情,然后再获取异步执行结果。获取执行结果,异步和同步都一样,本质都是com.alibaba.dubbo.remoting.exchange.ResponseFuture.get()
。异步调用是把ResponseFuture方到了dubbo上下文,需要客户端手工显示的获取ResponseFuture,接着再get()获取执行结果。
以前时候(dubbo2.5),dubbo异步调用还有个坑,比如A->B--->C,其中A调用B是异步,B调用C是同步,但是实际上B调用C也会异步,因为异步标识被传递了。这个问题在dubbo2.6是不存在的,可以看ContextFilter内,把隐式参数async清除了。
2.2.异步调用判断和异步标识来源
接着看异步的判断:
异步的判断RpcUtils.isAsync(URL, Invocation)
//com.alibaba.dubbo.rpc.support.RpcUtils.isAsync(URL, Invocation)
public static boolean isAsync(URL url, Invocation inv) {
boolean isAsync;
if (Boolean.TRUE.toString().equals(inv.getAttachment(Constants.ASYNC_KEY))) {//RpcInvocation.attachments有async=true
isAsync = true;
} else {
isAsync = url.getMethodParameter(getMethodName(inv), Constants.ASYNC_KEY, false);//url上有异步标识${方法名}.async=true 或者 default.async=true
}
return isAsync;
}
异步标识的来源:
- RpcContext.getContext().setAttachment(Constants.ASYNC_KEY, "true");
- url上有${方法名}.async=true
那么url上有异步标识是通过@Reference(async = true) 或 @Reference(methods={@Method(name="方法名1",async = true), @Method(name="方法名2",async = true)}) 配置
@Reference(async = true)
@Reference(methods={@Method(name="方法名1",async = true), @Method(name="方法名2",async = true)}) 配置 生成的url例子
@Method这样设置异步,具体是在com.alibaba.dubbo.config.ReferenceConfig.init()
3.dubbo异步调用图解
步骤1. consumer端业务线程调用dubbo引用方法,由IO线程发起网络通信,发送请求给provider
步骤2. netty nio网络通信,异步
步骤3. 设置ResponseFuture到RpcContext上。和步骤2基本是同时进行
步骤4. 从RpcContext获取ResponseFuture。然后consumer端进行其它业务逻辑处理
步骤5. ResponseFuture 阻塞获取provider端执行结果。其中步骤6、7可能执行结果早于步骤5,
步骤6. provider端处理完毕,响应执行结果
步骤7. 设置provider端执行结果到ResponseFuture,并唤醒ResponseFuture
4.总结
dubbo异步是基于NIO的非阻塞实现并行调用,客户端不需要启动多线程即可完成并行调用多个远程服务,相对多线程开销较少。
dubbo异步调用就是请求发送出去,然后业务做一些其它处理,最后再获取异步执行结果。
dubbo异步调用使用场景:
1.用于耗时久的请求调用,且下步业务处理,不依赖这个异步调用结果
2.用于不重要的操作,比如记录用户行为等
dubbo异步在平时工作中使用较少,基本还是同步为主,真要到了dubbo同步无法提高吞吐量和解耦,通常需要改变架构设计了,通常会使用mq进行异步消峰解耦。