把之前CompletableFuture留下的坑给填上。

你好呀,我是歪歪。

填个坑吧,把之前一直欠着的 CompletableFuture 给写了,因为后台已经收到过好几次催更的留言了。

这玩意我在之前写的这篇文章中提到过:《面试官问我知不知道异步编程的Future》

因为是重点写 Future 的,所以 CompletableFuture 只是在最后一小节的时候简单的写了一下:

把之前CompletableFuture留下的坑给填上。

我就直接把当时的例子拿过来改一下吧,先把代码放在这里了:

public class MainTest {

    public static void main(String[] args) throws Exception {
        CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "-女神:我开始化妆了,好了我叫你。");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "化妆完毕了。";
        }).whenComplete((returnStr, exception) -> {
            if (exception == null) {
                System.out.println(Thread.currentThread().getName() + returnStr);
            } else {
                System.out.println(Thread.currentThread().getName() + "女神放你鸽子了。");
                exception.printStackTrace();
            }
        });
        System.out.println(Thread.currentThread().getName() + "-等女神化妆的时候可以干点自己的事情。");
        Thread.currentThread().join();
    }
}

核心需求就是女神化妆的时候,我可以先去干点自己的事情。

上面的程序运行结果是这样的:

把之前CompletableFuture留下的坑给填上。

符合我们的预期,没有任何毛病。

但是当你自己去编写程序的时候,有可能会遇到这样的情况:

把之前CompletableFuture留下的坑给填上。

什么情况,女神还在化妆呢,程序就运行完了?

把之前CompletableFuture留下的坑给填上。

是的,这就是我要说的第一个关于 CompletableFuture 的知识点:守护线程。

守护线程

你仔细观察前面提到的两个截图,对比一下他们的第 28 行,第二个截图少了一行代码:

Thread.currentThread().join();

这行代码是在干啥事呢?

目的就是阻塞主线程,哪怕你让主线程睡眠也行,反正目的就是把主线程阻塞住。

如果没有这行代码,出现的情况就是主线程直接运行完了,程序也就结束了。

你想想,会是什么原因?

这个时候你脑海里面应该啪的一下,很快就想到“守护线程”这个概念。

主线程是用户线程,这个没啥说的。

所有的用户线程执行完成后, JVM 也就退出了。

因此,出现上面问题的原因我有合理的理由猜测:CompletableFuture 里面执行的任务属于守护线程。

有了理论知识的支撑,并推出这个假设之后,就有了证实的方向,问题就很简单了。

啪的一下在这里打上一个断点,然后 Debug 起来,表达式一写就看出来了,确实是守护线程:

把之前CompletableFuture留下的坑给填上。

我一般是想要看到具体的代码的,就是得看到把这个线程设置为守护线程的那一行代码,我才会死心。

所以我就去找了一下,还是稍微花了点时间,过程就不描述了,直接说结论吧。

首先 CompletableFuture 默认的线程池是 ForkJoinPool,这个是很容易就能在源码里面找到的:

把之前CompletableFuture留下的坑给填上。

在 ForkJoinPool 里面,把线程都设置为守护线程的地方就在这里:

java.util.concurrent.ForkJoinPool#registerWorker

把之前CompletableFuture留下的坑给填上。

你若是想要自己调试的话,那么在这里打上断点之后,可以看一下调用栈,很快就摸清楚这个调用流程了:

把之前CompletableFuture留下的坑给填上。

另外,我在写文章的过程中还注意到了这个注释:

把之前CompletableFuture留下的坑给填上。

前面大概就是说 shutdown 和 shutdownNow 对于这个线程池来说没用。

如果,线程池里面的任务需要在程序终止前完成,那么应该在退出前调用 commonPool().awaitQuiescence。

把之前CompletableFuture留下的坑给填上。

所以,我的程序应该改成这样:

把之前CompletableFuture留下的坑给填上。

可以,不错,很优雅。

把之前CompletableFuture留下的坑给填上。

如果,你的异步任务非常重要,必须要执行完成,那么 ForkJoinPool 也给你封装好了一个方法:

java.util.concurrent.ForkJoinPool#quiesceCommonPool

把之前CompletableFuture留下的坑给填上。

另外,其实 CompletableFuture 也是支持传一个自定义线程池的:

把之前CompletableFuture留下的坑给填上。

比如,我把前面的程序改成下面这样:

把之前CompletableFuture留下的坑给填上。

加入指定线程池的逻辑,注释掉主线程 join 的代码,跑起来之后。诶,JVM 一直都在。

你说神奇不神奇?

把之前CompletableFuture留下的坑给填上。

我想这个原因就不用我来分析了吧?

和 Future 对比

CompletableFuture 其实就是 Future 的升级版。

Future 有的,它都有。

Future 的短板,它补上了。

毕竟一个是 JDK 1.5 时代的产物,另一个是 1.8 时代的作品:

把之前CompletableFuture留下的坑给填上。
把之前CompletableFuture留下的坑给填上。

中间跨度了整整 10 年,10 年啊!

把之前CompletableFuture留下的坑给填上。

所以,后来居上。

给大家对比一下 Future 和 CompletableFuture。

首先对于我个人而言,第一个最直观的感受是获取结果的姿势舒服多了。

我不得不又把这张图拿出来说说了,主要关注下面的两种 future 和 callback:

把之前CompletableFuture留下的坑给填上。

当我们用 Future 去实现异步,要获取异步结果的时候,是怎么样操作的?

是不是得调用 future.get() 方法去取值。

如果这个时候值已经准备就绪,在 future 里面封装好了,那么万事大吉,直接拿出来就可以用。

但是如果值还没有准备好呢?

是不是就阻塞等待了?

所以我常常说 Future 是一种阉割版的异步模式。

比如还是最开始的例子,如果我用 Future 来做,那么是这样的:

把之前CompletableFuture留下的坑给填上。

你仔细看我框起来的地方,是 main 线程开始获取结果,获取结果的这个动作把 main 线程给阻塞住了。

你就去洗不了头了,老弟。

好,你说你把获取结果的操作放到最后,没问题。

但是,无论你放在哪里,你都有一个 get 的动作,且你执行这个动作的时候,你也不知道值到底准备好了没,所以有可能出现阻塞等待的情况。

好,那么问题来了:如果消除这个阻塞等待呢?

很简单,换个思路,我们从主动问询,变成等待通知。

女神化妆好了之后,主动通知一下我不就好了吗?

用程序员的话说就是:运行结果出来了,你执行一下我留给你的回调函数不就好了吗?

CompletableFuture 就可以干这个事儿。

用 CompletableFuture 写一遍上面的程序就是这样的:

把之前CompletableFuture留下的坑给填上。

pool-1-thread-1,女神化妆的这个线程,她好了之后会主动叫你,你明白吗?

这就是我第一次接触到 CompletableFuture 后,学到的第一个让我感到舒服的地方。

这种写法你注意,whenComplete(returnStr, exception) 返回信息和异常信息在这里都有了。

除此之外,这个方法还是带返回值的,你也完全可以像是用 Future 那样通过 get 获取其返回值:

把之前CompletableFuture留下的坑给填上。

按理来说也就是可以用了。

但是如果你不需要返回值,它还提供了这样的写法:

把之前CompletableFuture留下的坑给填上。

正常情况和异常情况分开处理。

优雅,非常优雅。

还有更牛的。

前面我们化妆的线程和化妆完成的线程不是同一个线程吗:

把之前CompletableFuture留下的坑给填上。

假设我们需要两个不同的线程,一个只负责化妆,一个只负责通知。毕竟女神化完妆之后,更加女神了,搞两个线程我寻思也不过分。

把之前CompletableFuture留下的坑给填上。

改动点小到令人发指:

把之前CompletableFuture留下的坑给填上。

只需要把调用的方法从 whenComplete 改为 whenCompleteAysn 即可。

同样,这个方法也支持指定线程池:

把之前CompletableFuture留下的坑给填上。

你可以去看 CompletableFuture 里面有非常多的 Aysn 结尾的方法,大多都是干这个事儿的,以异步的形式在线程池中执行。

如果说上面的介绍让你觉得不过如此,那么再介绍一个 Future 没有的东西。

假设现在需求是这样的。

女神化完妆之后,还要花个一小会选衣服,不过分吧。

也就是说我们现在有两个异步任务,第一个是化妆,第二个是选衣服。

选衣服要在化妆完成之后进行,这两个任务是串行的,用 CompletableFuture 怎么实现呢?

我把代码贴一下,为了看起来更加直观,我没有用链式调用:

public class MainTest {

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //任务一
        CompletableFuture<String> makeUpFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "-女神:我开始化妆了。");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "化妆完毕了。";
        }, executorService);

        //任务二(makeUpFuture是方法调用方,意思是等makeUpFuture执行完成后执行再执行)
        CompletableFuture<String> dressFuture = makeUpFuture.thenApply(makeUp -> {
            System.out.println(Thread.currentThread().getName() + "-女神:" + makeUp + "我开始选衣服啦,好了我叫你。");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return makeUp + "衣服也选好啦。靓仔,走去玩儿吧。";
        });

        //获取结果
        dressFuture.thenAccept(result -> {
            System.out.println(Thread.currentThread().getName() + "-" + result);
        });
    }
}

这样输出结果就是这样的:

把之前CompletableFuture留下的坑给填上。

符合我们的预期。

假设我们想要选衣服的时候换另外一个线程怎么办呢?

别说不知道,这不刚才教你了吗,Async 结尾的方法,得活学活用起来:

把之前CompletableFuture留下的坑给填上。

前面讲的是多个异步任务串行执行,接下来再说一下并行。

CompletableFuture 里面提供了两个并行的方法:

把之前CompletableFuture留下的坑给填上。

两个方法的入参都是可变参数,就是一个个异步任务。

allOf 顾名思义就是入参的多个 CompletableFuture 都必须成功,才能继续执行。

而 anyOf 就是入参的多个 CompletableFuture 只要有一个成功就行。

还是举个例子。

假设,我是说假设啊,我是一个海王。

算了,我假设我有一个朋友吧。

把之前CompletableFuture留下的坑给填上。

他同时追求好几个女朋友。今天他打算约小美和小乖中的一个出门玩,随便哪个都行。谁先化妆完成,就约谁。另外一个就放她鸽子。

这个场景,我们就可以用 anyOf 来模拟,于是就出现了这样的代码:

把之前CompletableFuture留下的坑给填上。

从输出结果来看,最后和朋友约会的是小美。

都把小美约出来了,必须要一起吃个饭才行,对吧。

那么这个时候朋友问:小美,你想吃点什么呢?

小美肯定会回答:随便,就行,无所谓。

听到这样的回答,朋友心里就有底了,马上给出了一个方案:我们去吃沙县小吃或者黄焖鸡吧,哪一家店等的时间短,我们就去吃哪一家。

把之前CompletableFuture留下的坑给填上。

于是上面的代码,就变成了这样:

把之前CompletableFuture留下的坑给填上。

输出结果是这样的:

把之前CompletableFuture留下的坑给填上。

我把代码都放这里,你粘过去就能跑起来:

public class MainTest {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);

        CompletableFuture<String> xiaoMei = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "-小美:我开始化妆了,好了我叫你。");
            try {
                int time = ThreadLocalRandom.current().nextInt(5);
                TimeUnit.SECONDS.sleep(time);
                System.out.println(Thread.currentThread().getName() + "-小美,化妆耗时:" + time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "小美:化妆完毕了。";
        }, executorService);

        CompletableFuture<String> xiaoGuai = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "-小乖:我开始化妆了,好了我叫你。");
            try {
                int time = ThreadLocalRandom.current().nextInt(5);
                TimeUnit.SECONDS.sleep(time);
                System.out.println(Thread.currentThread().getName() + "-小乖,化妆耗时:" + time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "小乖:化妆完毕了。";
        }, executorService);

        CompletableFuture<Object> girl = CompletableFuture.anyOf(xiaoMei, xiaoGuai);
        girl.thenAccept(result -> {
            System.out.println("我看最后是谁先画完呢 = " + result);
        });

        CompletableFuture<String> eatChooseOne = girl.thenApplyAsync((result) -> {
            try {
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return result + "这里人少,我们去吃沙县小吃吧!";
        }, executorService);

        CompletableFuture<String> eatChooseTwo = girl.thenApplyAsync((result) -> {
            try {
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return result + "这里人少,我们去吃黄焖鸡吧!";
        }, executorService);

        CompletableFuture.allOf(eatChooseOne, eatChooseTwo).thenAccept(result -> {
            System.out.println("最终结果:" + result);
        });
    }
}

如果你说,小孩子才做选择,大人是全部都要。

那么,你可以试着用一下 allOf,只是需要注意的是,allOf 是不带返回值的。

好了,写到这里我都感觉有点像是 API 教学了,没啥劲。所以 CompletableFuture 还有很多很多的方法,我就不一一介绍了。

把之前CompletableFuture留下的坑给填上。

再说说 get 方法

最后,再看看 get 方法吧。之前发布的《看完JDK并发包源码的这个性能问题,我惊了!》这篇文章,有朋友看了之后有几个问题,我再串起来讲一下。

CompletableFuture 提交任务的方式有两种:

把之前CompletableFuture留下的坑给填上。

一种是 supplyAsync 带返回值的。

一种是 runAsync 返回值是 void 的,相当于没有返回值。

比如,我们用 supplyAsync 的时候:

把之前CompletableFuture留下的坑给填上。

就刻意返回一个 null。

我还可以扩展一下,假设我们的方法用的是 runAsync,本来就没有返回值的。

比如这样:

把之前CompletableFuture留下的坑给填上。

我们再看一下 get 方法:

把之前CompletableFuture留下的坑给填上。

你看这里的判断条件是 (r = result) == null

那么问题就来了,假设这个方法的返回值本来就是 null,也就是我们上面的情况,怎么办呢?

为 null 就有三种情况了:

  • 1.是 runAsync 这种,真的没有返回值,所以就算任务执行完成了,get 出来的确实就是 null。
  • 2.是有返回值的,只是目前任务还没执行完成,所以 result 还是 null。
  • 3.是有返回值的,返回的值就是 null。

怎么去分别出这三种情况呢?

那么就要看看这个 result 赋值的地方了,用脚指头猜也知道在这里搞了一些事情。

所以简单的找寻一番之后,可以找到这个关键的地方:

把之前CompletableFuture留下的坑给填上。

框起来的代码,目的是为了获取 CompletableFuture 类中的 result 字段的偏移量,并用大写的 RESULT 存储起来。

有经验的朋友看到这里大概就知道它要用 compareAndSwapObject 这个骚操作了:

把之前CompletableFuture留下的坑给填上。

然后就能找到这几个和 null 相关的地方:

把之前CompletableFuture留下的坑给填上。

答案就是我框起来的部分:在 CompletableFuture 里面,把 null 也封装到 AltResult 对象里面了。

基于此,可以区分出前面我说的那三种情况。

你看这里有一个专门的 completeNull 方法,其中的调用者就有 AysncRun 方法:

把之前CompletableFuture留下的坑给填上。

你可以在其调用的地方打上断点,然后把我前面用 runAsync 提交方式的代码跑起来:

把之前CompletableFuture留下的坑给填上。

再去看看调用栈,调试一下,你就知道 runAsync 这种,真的没有返回值的是怎么处理的了。

核心技术就是把 null 封装到 AltResult 对象里面。

然后如何分別返回值就是 null 的情况呢?

都有一个代表 null 的对象了,那还不简单吗,一个小小的判断就搞定了:

把之前CompletableFuture留下的坑给填上。

最后,再提一下这个方法:

java.util.concurrent.CompletableFuture#waitingGet

我之前那篇文章里面写了这样一句话:

加入这个自旋,是为了稍晚一点执行后续逻辑中的 park 代码,这个稍重一点的操作。但是我觉得这个 “brief spin-wait” 的收益其实是微乎其微的。

有小伙伴问我 park 的逻辑在哪?

其实就在 waitingGet 的 while 循环的最后一个分支里面,也就是我框起来的部分:

把之前CompletableFuture留下的坑给填上。

最后你顺着往下 Debug ,就能找到这个地方:

java.util.concurrent.CompletableFuture.Signaller#block

这里不就是 park 的逻辑吗:

把之前CompletableFuture留下的坑给填上。

打上断点自己玩去吧。

其实还有一种骚操作,我一般不告诉别人,也简单的分享一下吧。

还是拿前面的代码做演示,这个代码你跑起来之后,主线程由于调用了 get 方法,那么势必会阻塞等待异步任务的结果:

把之前CompletableFuture留下的坑给填上。

你就把它给跑起来,然后点一下这个照相机的图标:

把之前CompletableFuture留下的坑给填上。

就可以看到这样的画面:

把之前CompletableFuture留下的坑给填上。

主线程是 park 起来的,在哪被 park 起来的呢?

at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)

把之前CompletableFuture留下的坑给填上。

这不就是我刚刚给你说的方法吗?

然后你在这里打上断点,看一下调用堆栈,不就把主链路玩得明明白白的嘛:

把之前CompletableFuture留下的坑给填上。

怎么样,这波逆向操作,溜不溜,分分钟就学会了。

把之前CompletableFuture留下的坑给填上。

找到了 park 的地方,那么在哪儿被 unpark 的呢?

这还不简单吗?

反正我一搜就搜出来了:

把之前CompletableFuture留下的坑给填上。

然后再在 unpark 这里打上一个断点:

把之前CompletableFuture留下的坑给填上。

唤醒流程也可以调试的明明白白。

好了,挂起和唤醒都给你定位到关键地方了,就到这,玩去吧。

本文已收录自个人博客,欢迎大家来玩。

https://www.whywhy.vip/

把之前CompletableFuture留下的坑给填上。
上一篇:CompletableFuture 测试类


下一篇:CompletableFutures多线程阻塞获取结果