2021-10-29

Java并发编程(一)线程创建、生命周期、控制这篇文章我们提到过Future可以实现异步编程,但是Future模式有自己的缺点:

Future虽然可以实现获取异步执行结果的需求,但是它没有提供通知的机制,我们无法得知Future什么时候完成。
要么使用阻塞,在future.get()的地方等待future返回的结果,这时又变成同步操作。要么使用isDone()轮询地判断Future是否完成,这样会耗费CPU的资源。
//定义一个异步任务
Future future = executor.submit(()->{
Thread.sleep(2000);
return “hello world”;
});
//轮询获取结果
while (true){
if(future.isDone()) {
System.out.println(future.get());
break;
}
}
2021-10-29

CompletableFuture能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。它避免了传统回调最大的问题,那就是能够将控制流分离到不同的事件处理器中。

CompletableFuture弥补了Future模式的缺点。在异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过thenAccept、thenApply、thenCompose等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理。

创建CompletableFuture对象
CompletableFuture提供了四个静态方法用来创建CompletableFuture对象:

runAsync 和 supplyAsync 方法的区别是runAsync返回的CompletableFuture是没有返回值的。

CompletableFuture future = CompletableFuture.runAsync(() -> {
System.out.println(“Hello”);
});

    try {
        future.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

    System.out.println("CompletableFuture");

而supplyAsync返回的CompletableFuture是有返回值的,下面的代码打印了future的返回值。

CompletableFuture future = CompletableFuture.supplyAsync(() -> “Hello”);

    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

    System.out.println("CompletableFuture");

CompletableFuture获取结果
//同步获取结果
public T get()
public T get(long timeout, TimeUnit unit)
public T getNow(T valueIfAbsent)
public T join()

getNow有点特殊,如果结果已经计算完则返回结果或者抛出异常,否则返回给定的valueIfAbsent值。
join()与get()区别在于join()返回计算的结果或者抛出一个unchecked异常(CompletionException),而get()返回一个具体的异常.

CompletableFuture主动结束
2021-10-29

future.get()在等待执行结果时,程序会一直block,如果此时调用complete(T t)会立即执行。

 CompletableFuture<String> future  = CompletableFuture.supplyAsync(() -> "Hello");

    future.complete("World");

    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

执行结果:
2021-10-29

可以看到future调用complete(T t)会立即执行。但是complete(T t)只能调用一次,后续的重复调用会失效。
如果future已经执行完毕能够返回结果,此时再调用complete(T t)则会无效。

CompletableFuture future = CompletableFuture.supplyAsync(() -> “Hello”);

    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    future.complete("World");

    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

执行结果:
2021-10-29

如果使用completeExceptionally(Throwable ex)则抛出一个异常,而不是一个成功的结果。

CompletableFuture future = CompletableFuture.supplyAsync(() -> “Hello”);

    future.completeExceptionally(new Exception());

    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

执行结果:
2021-10-29

转换
map
2021-10-29

thenApply的功能相当于将CompletableFuture<\T>转换成CompletableFuture<\U>。

CompletableFuture future = CompletableFuture.supplyAsync(() -> “Hello”)
.thenApply(s -> s + " World")
.thenApply(String::toUpperCase);

    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

执行结果:
2021-10-29

下面的例子,展示了数据流的类型经历了如下的转换:String -> Integer -> Double。

    CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> "10")
            .thenApply(Integer::parseInt)
            .thenApply(i->i*10.0);

    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

执行结果:
2021-10-29

flatMap
2021-10-29

thenCompose可以用于组合多个CompletableFuture,将前一个结果作为下一个计算的参数,它们之间存在着先后顺序。

CompletableFuture future = CompletableFuture.supplyAsync(() -> “Hello”)
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

执行结果:
2021-10-29

下面的例子展示了多次调用thenCompose()

CompletableFuture future = CompletableFuture.supplyAsync(() -> “100”)
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + “100”))
.thenCompose(s -> CompletableFuture.supplyAsync(() -> Double.parseDouble(s)));

    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

执行结果:
2021-10-29

组合
(1)
2021-10-29

现在有CompletableFuture<\T>、CompletableFuture<\U>和一个函数(T,U)->V,thenCompose就是将CompletableFuture<\T>和CompletableFuture<\U>变为CompletableFuture<\V>。

CompletableFuture future1 = CompletableFuture.supplyAsync(() -> “100”);
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> 100);
CompletableFuture future = future1.thenCombine(future2, (s, i) -> Double.parseDouble(s + i));

    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

执行结果:
2021-10-29

注意:使用thenCombine()之后future1、future2之间是并行执行的,最后再将结果汇总。这一点跟thenCompose()不同。

(2)
2021-10-29

thenAcceptBoth跟thenCombine类似,但是返回CompletableFuture<\Void>类型。

CompletableFuture future1 = CompletableFuture.supplyAsync(() -> “100”);
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> 100);
CompletableFuture future = future1.thenAcceptBoth(future2, (s, i) -> System.out.println(Double.parseDouble(s + i)));

    try {
        future.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

2021-10-29

计算结果完成时的处理
当CompletableFuture完成计算结果后,我们可能需要对结果进行一些处理。

执行特定的Action
2021-10-29
CompletableFuture future = CompletableFuture.supplyAsync(() -> “Hello”)
.thenApply(s->s+" World")
.thenApply(s->s+ “\nThis is CompletableFuture demo”)
.thenApply(String::toLowerCase)
.whenComplete((result, throwable) -> System.out.println(result));

    try {
        future.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

执行结果:
2021-10-29

执行完Action可以做转换
2021-10-29

    CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> "100")
            .thenApply(s->s+"100")
            .handle((s, t) -> s != null ? Double.parseDouble(s) : 0);

    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

执行结果:
2021-10-29

注意:在这里,handle()的参数是BiFunction,apply()方法返回R,相当于转换的操作。而whenComplete()的参数是BiConsumer,accept()方法返回void。所以,handle()相当于whenComplete()+转换。

纯消费
2021-10-29

thenAccept()是只会对计算结果进行消费而不会返回任何结果的方法。

CompletableFuture future = CompletableFuture.supplyAsync(() -> “Hello”)
.thenApply(s->s+" World")
.thenApply(s->s+ “\nThis is CompletableFuture demo”)
.thenApply(String::toLowerCase)
.thenAccept(System.out::print);
1
2
3
4
5
执行结果:
2021-10-29

应用:

dataList = CompletableFuture.supplyAsync(() -> queryContentIds(spaceId, contentId, level), LocalThreadPool.MESSAGE_FACTORY_CONTENT)
.thenApply(list -> list.toArray(new String[0]))
.thenApply(array -> visitRecordService.query(queryParamEntity, empNo, language, spaceId, array))
.whenComplete((visitRecordEntitySearchResultEntity, throwable) -> {
searchResultEntity.set(visitRecordEntitySearchResultEntity);
//回刷缓存
CompletableFuture.runAsync(() -> {
if (CURRENT.equals(level)) {
String key = WikiConstants.ICENTER_CONTENTS_SYMBOL_REDIS_KEY + spaceId + WikiConstants.ICENTER_CONTENTS_REDIS_KEY_CONTENT_VISIT_COUNT;
RedisUtil.hset(key, contentId, visitRecordEntitySearchResultEntity.getTotalRecord());
}
});
})
.get(5, TimeUnit.SECONDS)
.getItems().stream()
.map(item -> ImmutableMap.of(“create_by”, item.getVisitor(), “max_view_time”, item.getViewTime(), “sum_view_count”, String.valueOf(item.getViewCount())))
.collect(Collectors.toList());

Either
Either 表示的是两个CompletableFuture,当其中任意一个CompletableFuture计算完成的时候就会执行。

(1)

2021-10-29

Random random = new Random();

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{

        try {
            Thread.sleep(random.nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "from future1";
    });

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{

        try {
            Thread.sleep(random.nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "from future2";
    });

    CompletableFuture<Void> future =  future1.acceptEither(future2,str->System.out.println("The future is "+str));

    try {
        future.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

执行结果:The future is from future1 或者 The future is from future2。
因为future1和future2,执行的顺序是随机的。

(2)

2021-10-29

Random random = new Random();

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{

        try {
            Thread.sleep(random.nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "from future1";
    });

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{

        try {
            Thread.sleep(random.nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "from future2";
    });

    CompletableFuture<String> future =  future1.applyToEither(future2,str->"The future is "+str);

    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

执行结果也跟上面的程序类似。

其他方法
allOf、anyOf是CompletableFuture的静态方法。

allOf
2021-10-29

allOf()方法所返回的CompletableFuture,并不能组合前面多个CompletableFuture的计算结果。于是我们借助Java 8的Stream来组合多个future的结果。

CompletableFuture future1 = CompletableFuture.supplyAsync(() -> “tony”);

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "cafei");

    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "aaron");

    CompletableFuture.allOf(future1, future2, future3)
            .thenApply(v ->
            Stream.of(future1, future2, future3)
                    .map(CompletableFuture::join)
                    .collect(Collectors.joining(" ")))
            .thenAccept(System.out::print);

执行结果:
2021-10-29

应用:

    CompletableFuture.allOf(contents.stream().filter(c -> StringUtils.isNotBlank(c.getId())).map(content -> CompletableFuture.runAsync(() -> {
        String contentId = Optional.ofNullable(content).map(Content::getId).orElse(null);
        //filter children without auth
        if (!forbiddenSet.contains(contentId)) {
            Sets.SetView<String> difference = Sets.difference(forbiddenSet, getParentPath(contentId));
            ImmutableSet<String> strings = difference.immutableCopy();
            if (strings.size() == forbiddenSet.size()) {
                viewTransferDTOS.add(generateViewTransferDTO(spaceId, content, lan));
            }
        }
    })).toArray(CompletableFuture[]::new)).join();

anyOf

2021-10-29

Random rand = new Random();
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return “from future1”;
});
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return “from future2”;
});
CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return “from future3”;
});

    CompletableFuture<Object> future =  CompletableFuture.anyOf(future1,future2,future3);

    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

使用anyOf()时,只要某一个future完成,就结束了。所以执行结果可能是"from future1"、“from future2”、"from future3"中的任意一个。

注意:anyOf 和 acceptEither、applyToEither的区别在于,后两者只能使用在两个future中,而anyOf可以使用在多个future中。

CompletableFuture异常处理
CompletableFuture在运行时如果遇到异常,可以使用get()并抛出异常进行处理,但这并不是一个最好的方法。CompletableFuture本身也提供了几种方式来处理异常。

exceptionally
2021-10-29

    CompletableFuture.supplyAsync(() -> "hello world")
            .thenApply(s -> {
                s = null;
                int length = s.length();
                return length;
            }).thenAccept(i -> System.out.println(i))
            .exceptionally(t -> {
                System.out.println("Unexpected error:" + t);
                return null;
            });

执行结果:
2021-10-29

对上面的代码稍微做了一下修改,修复了空指针的异常。

    CompletableFuture.supplyAsync(() -> "hello world")
            .thenApply(s -> {
               // s = null;
                int length = s.length();
                return length;
            }).thenAccept(i -> System.out.println(i))
            .exceptionally(t -> {
                System.out.println("Unexpected error:" + t);
                return null;
            });

执行结果:
2021-10-29

whenComplete
whenComplete 在上一篇文章其实已经介绍过了,在这里跟exceptionally的作用差不多,可以捕获任意阶段的异常。如果没有异常的话,就执行action。

CompletableFuture.supplyAsync(() -> “hello world”)
.thenApply(s -> {
s = null;
int length = s.length();
return length;
}).thenAccept(i -> System.out.println(i))
.whenComplete((result, throwable) -> {

                if (throwable != null) {
                   System.out.println("Unexpected error:"+throwable);
                } else {
                    System.out.println(result);
                }

            });

执行结果:
2021-10-29

上一篇:2021-10-29


下一篇:2021-10-29