java – CompletableFuture allof(..).join()vs CompletableFuture.join()

我目前正在使用CompletableFuture supplyAsync()方法将一些任务提交到公共线程池.以下是代码段的样子:

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join();

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.getNow())
        .forEach(tests::addAll);

我想知道以下与上面的代码有什么不同.我从下面的代码中删除了parent completableFuture,并为每个completableFuture而不是getNow()添加了join():

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.join())
        .forEach(tests::addAll);

我在spring服务中使用它,并且线程池耗尽存在问题.任何指针都非常感激.

解决方法:

首先,.getNow()不起作用,因为此方法需要一个回退值作为未来尚未完成的案例的参数.由于您假设将来要在这里完成,您还应该使用join().

然后,在任何一种情况下都没有关于线程耗尽的区别,您在继续之前等待所有作业的完成,可能会阻塞当前线程.

避免这种情况的唯一方法是重构代码以不同步期望结果,而是在所有作业完成后安排后续处理操作完成.然后,使用allOf变得相关:

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
    .map(resolver -> supplyAsync(() -> task.doWork()))
    .collect(toList());

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
    .thenAccept(justVoid -> {
        // here, all jobs have been completed
        final List<Test> tests = completableFutures.stream()
            .flatMap(completableFuture -> completableFuture.join().stream())
            .collect(toList());
        // process the result here
    });

顺便说一下,关于收集的toArray方法,我建议阅读Arrays of Wisdom of the Ancients

上一篇:Python 3:Pool是否保持传递给map的原始数据顺序?


下一篇:为什么Java工作窃取池(ForkJoinPool)不支持从线程池中预先创建的线程?