此文已由作者赵计刚授权网易云社区发布。
欢迎访问网易云社区,了解更多网易技术产品运营经验。
一、使用方式
服务提供方不变,调用方代码如下:
1 <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
2 <dubbo:method name="sayHello" async="true" timeout="60000"/>
3 <dubbo:method name="sayBye" async="true" timeout="60000"/>
4 </dubbo:reference>
配置里添加<dubbo:method name="xxx" async="true"/>,表示单个方法xxx使用异步方式;如果demoService下的所有方法都使用异步,直接配置为<dubbo:reference async="true"/>。
1 public static void main(String[] args) throws Exception {
2 //Prevent to get IPV6 address,this way only work in debug mode
3 //But you can pass use -Djava.net.preferIPv4Stack=true,then it work well whether in debug mode or not
4 System.setProperty("java.net.preferIPv4Stack", "true");
5
6 asyncFuture2();
7 }
8
9 public static void asyncFuture1() throws ExecutionException, InterruptedException {
10 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
11 context.start();
12 DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
13
14 long start = System.currentTimeMillis();
15
16 demoService.sayHello("zhangsan");
17 Future<String> helloFuture = RpcContext.getContext().getFuture();
18
19 demoService.sayBye("lisi");
20 Future<String> byeFuture = RpcContext.getContext().getFuture();
21
22 final String helloStr = helloFuture.get();//消耗5s
23 final String byeStr = byeFuture.get();//消耗8s
24
25 System.out.println(helloStr + " -- " + byeStr + " ,cost:" + (System.currentTimeMillis()-start));//总消耗8s
26 }
27
28 public static void asyncFuture2() throws ExecutionException, InterruptedException {
29 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
30 context.start();
31 DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
32
33 long start = System.currentTimeMillis();
34
35 Future<String> helloFuture = RpcContext.getContext().asyncCall(()-> demoService.sayHello("zhangsan"));
36 Future<String> byeFuture = RpcContext.getContext().asyncCall(()->demoService.sayBye("lisi"));
37
38 final String helloStr = helloFuture.get();//消耗5s
39 final String byeStr = byeFuture.get();//消耗8s
40
41 System.out.println(helloStr + " -- " + byeStr + " ,cost:" + (System.currentTimeMillis()-start));//总消耗8s
42 }
Consumer启动主类。其中asyncFuture2()方法是推荐用法,注意Callable(asyncCall方法的入参)只是一个任务task,不会新建线程;所以asyncFuture2()和asyncFuture1()相似,资源占用相同,都是用一根线程进行异步操作的。
二、asyncFuture1()源码解析
先来看asyncFuture1(),总体步骤:
demoService.sayHello("zhangsan"); 创建一个Future对象,存入当前线程的上下文中
Future<String> helloFuture = RpcContext.getContext().getFuture(); 从当前线程的上下文中获取第一步存入的Future对象
final String helloStr = helloFuture.get(); 阻塞等待,从Future中获取结果
代码主要执行流(代码详细执行流看文章开头的三篇博客):
1、demoService.sayHello("zhangsan");
-->FutureFilter.invoke(final Invoker<?> invoker, final Invocation invocation)
-->DubboInvoker.doInvoke(final Invocation invocation)
FutureFilter:
1 public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
2 final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
3
4 fireInvokeCallback(invoker, invocation);
5 // need to configure if there's return value before the invocation in order to help invoker to judge if it's
6 // necessary to return future.
7 Result result = invoker.invoke(invocation);
8 if (isAsync) {
9 asyncCallback(invoker, invocation);
10 } else {
11 syncCallback(invoker, invocation, result);
12 }
13 return result;
14 }
对于如上异步操作(asyncFuture1()和asyncFuture2()),FutureFilter没起任何作用,该Filter主要会用在事件通知中,后续再说。
DubboInvoker.doInvoke(final Invocation invocation):
1 protected Result doInvoke(final Invocation invocation) throws Throwable {
2 RpcInvocation inv = (RpcInvocation) invocation; 3 final String methodName = RpcUtils.getMethodName(invocation);
4 inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
5 inv.setAttachment(Constants.VERSION_KEY, version);
6
7 ExchangeClient currentClient;
8 if (clients.length == 1) {
9 currentClient = clients[0];
10 } else {
11 currentClient = clients[index.getAndIncrement() % clients.length];
12 }
13 try {
14 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
15 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
16 int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
17 if (isOneway) { //无返回值
18 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
19 currentClient.send(inv, isSent);
20 RpcContext.getContext().setFuture(null);
21 return new RpcResult();
22 } else if (isAsync) { //异步有返回值
23 ResponseFuture future = currentClient.request(inv, timeout);
24 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
25 return new RpcResult();
26 } else { //同步有返回值
27 RpcContext.getContext().setFuture(null);
28 return (Result) currentClient.request(inv, timeout).get();
29 }
30 } catch (TimeoutException e) {
31 throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
32 } catch (RemotingException e) {
33 throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
34 }
35 }
模式:
如果是isOneway(不需要返回值),不管同步还是异步,请求直接发出,不会创建Future,直接返回RpcResult空对象。
如果是isAsync(异步),则
先创建ResponseFuture对象,之后使用FutureAdapter包装该ResponseFuture对象;(创建ResponseFuture对象与同步的代码相同,最后得到的是一个DefaultFuture对象)
然后将该FutureAdapter对象设入当前线程的上下文中RpcContext.getContext();
最后返回空的RpcResult
如果是同步,则先创建ResponseFuture对象,之后直接调用其get()方法进行阻塞调用(见文章开头的三篇文章)
简单来看一下FutureAdapter:
1 public class FutureAdapter<V> implements Future<V> {
2
3 private final ResponseFuture future;
4
5 public FutureAdapter(ResponseFuture future) {
6 this.future = future;
7 }
8
9 public ResponseFuture getFuture() {
10 return future;
11 }
12
13 public boolean cancel(boolean mayInterruptIfRunning) {
14 return false;
15 }
16
17 public boolean isCancelled() {
18 return false;
19 }
20
21 public boolean isDone() {
22 return future.isDone();
23 }
24
25 @SuppressWarnings("unchecked")
26 public V get() throws InterruptedException, ExecutionException {
27 try {
28 return (V) (((Result) future.get()).recreate());
29 } catch (RemotingException e) {
30 throw new ExecutionException(e.getMessage(), e);
31 } catch (Throwable e) {
32 throw new RpcException(e);
33 }
34 }
35
36 @SuppressWarnings("unchecked")
37 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
38 int timeoutInMillis = (int) unit.convert(timeout, TimeUnit.MILLISECONDS);
39 try {
40 return (V) (((Result) future.get(timeoutInMillis)).recreate());
41 } catch (com.alibaba.dubbo.remoting.TimeoutException e) {
42 throw new TimeoutException(StringUtils.toString(e));
43 } catch (RemotingException e) {
44 throw new ExecutionException(e.getMessage(), e);
45 } catch (Throwable e) {
46 throw new RpcException(e);
47 }
48 }
49 }
最后,回头看一下FutureFilter:
1 public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
2 final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
3
4 fireInvokeCallback(invoker, invocation);
5 // need to configure if there's return value before the invocation in order to help invoker to judge if it's
6 // necessary to return future.
7 Result result = invoker.invoke(invocation);
8 if (isAsync) {
9 asyncCallback(invoker, invocation);
10 } else {
11 syncCallback(invoker, invocation, result);
12 }
13 return result;
14 }
更多网易技术、产品、运营经验分享请点击。
相关文章:
【推荐】 使用QUIC
【推荐】 数据库路由中间件MyCat - 源代码篇(5)