CompletableFuture是什么?
他可以优雅的做线程异步编排。
先干嘛,后干嘛。
runAsync(): 运行一个异步任务
public static CompletableFuture<Void> runAsync(Runnable runnable);
// executor指定线程池
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// 模拟线程池
private static Executor executor = Executors.newFixedThreadPool(4);
CompletableFuture.runAsync(() -> {
System.out.println("开始异步任务");
int i = 10 / 2;
System.out.println("结束异步任务");
}, executor);
supplyAsync(): 运行一个异步任务, 并提供一个返回值。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
// executor指定线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) ;
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开始异步任务");
int i = 10 / 2;
System.out.println("结束异步任务");
return i;
}, executor)
whenComplete(), exceptionally()
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开始异步任务");
int i = 10 / 2;
System.out.println("结束异步任务");
return i;
}, executor).whenComplete((res, err) -> {
System.out.println("异步结果是:"+res+",,,whenComplete抓住的异常为:"+err);
}).exceptionally(err -> {
System.out.println("exceptionally感知到的异常为:"+err);
return 10;
});
两个线程间调度
thenRun() 启动一个任务, 接着上一个任务做。
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) ;
CompletableFuture.supplyAsync(() -> {
System.out.println("开始异步任务,[[["+Thread.currentThread().getName());
int i = 10 / 2;
System.out.println("结束异步任务");
return i;
}, executor)
.thenRun(() -> { // [main]thenRun方法
System.out.println("["+Thread.currentThread().getName()+"]"+"thenRun方法");
})
.thenRunAsync(() -> { //[pool-1-thread-2]thenRun方法
System.out.println("["+Thread.currentThread().getName()+"]"+"thenRun方法");
}, executor)
.thenRunAsync(() -> { //[ForkJoinPool.commonPool-worker-1]thenRun方法
System.out.println("["+Thread.currentThread().getName()+"]"+"thenRun方法");
});
thenAccept():可以拿到上一次的返回结果,但是thenAccept方法本身没有返回值,只是拿着上一次的返回值做一个task
CompletableFuture.supplyAsync(() -> {
System.out.println("开始异步任务,[[["+Thread.currentThread().getName());
int i = 10 / 2;
System.out.println("结束异步任务");
return i;
}, executor).thenAcceptAsync(res -> {
System.out.println("["+Thread.currentThread().getName()+"]"+"thenAcceptAsync方法");
System.out.println("从上一步拿到了结果"+res);
}, executor);
thenApply():可以拿到上一次的返回结果,但是thenAccept方法本身没有返回值,只是拿着上一次的返回值做一个task
private static Integer thenApply() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开始异步任务,[[[" + Thread.currentThread().getName());
int i = 10 / 2;
System.out.println("结束异步任务");
return i;
}, executor).thenApplyAsync(res -> {
System.out.println("[" + Thread.currentThread().getName() + "]" + "thenAcceptAsync方法");
System.out.println("从上一步拿到了结果" + res);
return 10;
}, executor);
return future.get();
}
保证多个线程完成
- runBoth系列
- runAfterBoth:前面的组合任务做完以后,才做
- thenAcceptBothAsync:接收前两个返回值,做任务
- thenCombineAsync:接收前两个返回值,做任务并提供返回值
- runEither系列
- runAfterEitherAsync:完成其中一个即可
- acceptEitherAsync:接受前两个任务的其中的一个返回值
- applyToEitherAsync:接受前两个任务的其中的一个返回值, 并且自己提供返回值
private static void runBoth() {
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("开始异步任务01");
int i = 10 / 2;
System.out.println("结束异步任务01");
return i;
}, executor);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("开始异步任务02");
System.out.println("结束异步任务02");
return "future02 is end";
}, executor);
//runAfterBothAsync
future01.runAfterBothAsync(future02, () -> {
System.out.println("两个任务都结束后,开始异步任务03");
}, executor);
//thenAcceptBothAsync
future01.thenAcceptBothAsync(future02, (res01, res02) -> {
System.out.println("任务1结果为:"+res01);
System.out.println("任务2结果为:"+res02);
System.out.println("开始任务3:");
}, executor);
return future01.thenCombineAsync(future02, (res01, res02) -> {
System.out.println("任务1结果为:"+res01);
System.out.println("任务2结果为:"+res02);
System.out.println("开始任务3:");
return "任务三"+res01+res02;
}, executor).get();
}
private static void runEither() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("开始异步任务01");
int i = 10 / 2;
System.out.println("结束异步任务01");
return i;
}, executor);
CompletableFuture<Integer> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("开始异步任务02");
System.out.println("结束异步任务02");
return 2;
}, executor);
//runAfterEitherAsync
future01.runAfterEitherAsync(future02, () -> {
System.out.println("完成其中一个后,开始异步任务03");
}, executor);
//acceptEitherAsync
future01.acceptEitherAsync(future02, (res) -> {
System.out.println("任务结果为:"+res);
System.out.println("开始任务3:");
}, executor);
//applyToEitherAsync
CompletableFuture<Object> future = future01.applyToEitherAsync(future02, (res) -> {
System.out.println("任务结果为:" + res);
System.out.println("开始任务3:");
return "任务三的Either结果";
}, executor);
System.out.println(future.get());
}