一、创建异步对象
CompletableFuture提供了四个静态方法来创建一个异步操作。
//使用默认的线程池来回调
public static CompletableFuture<Void> runAsync(Runnable runnable) {...}
//该方法指定自定义的线程池executor来执行 --推荐
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {...}
//下面两个方法都能获取到异步方法的返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){...}
//推荐
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor){...}
runAsync方法创建:
public static void main(String[] args) {
System.out.println("主线程开始");
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 3;
System.out.println("运行结果:" + i);
}, executor);
System.out.println("主线程结束");
}
//运行结果:
主线程开始
主线程结束
当前线程:12
运行结果:3
supplyAsync方法创建:
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程开始");
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 3;
System.out.println("运行结果:" + i);
return i;
});
//get方法会阻塞等待
Integer integer = future1.get();
System.out.println("主线程结束:" + integer);
}
//运行结果:
主线程开始
当前线程:12
运行结果:3
主线程结束:3
二、计算完成时回调方法
//处理正常返回的情况
//执行当前任务的线程继续执行whenComplete的任务
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
//以新的线程池来执行任务
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(asyncPool, action);
}
//以指定的线程池executor来执行任务
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}
//处理异常情况
public CompletableFuture<T> exceptionally(
Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(fn);
}
方法不以Async结尾,意味着使用相同的线程执行,而Async则会使用其他线程来执行。
whenComplete处理
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程开始");
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}).whenComplete((res, exception)->{
System.out.println("结果是:" + res + ";异常是:" + exception);
});
//get方法会阻塞等待
Integer integer = future1.get();
System.out.println("主线程结束:" + integer);
}
//结果:
主线程开始
当前线程:12
运行结果:5
结果是:5;异常是:null
主线程结束:5
exceptionally处理异常
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程开始");
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}).whenComplete((res, exception)->{
System.out.println("结果是:" + res + ";异常是:" + exception);
}).exceptionally(throwable -> {
System.out.println("捕获到异常:" + throwable);
//异常是返回默认值0 即可将错误的结果进行修正
return 0;
});
//get方法会阻塞等待
Integer integer = future1.get();
System.out.println("主线程结束:" + integer);
}
//结果:
主线程开始
当前线程:12
结果是:null;异常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
捕获到异常:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
主线程结束:0
三、handle 方法
该方法可以处理completable执行完后的情况,不仅能够得到返回值,还可以修改返回值,也可感知异常。
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(asyncPool, fn);
}
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
return uniHandleStage(screenExecutor(executor), fn);
}
hanle示例
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程开始");
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}).handle((res, exception)->{
System.out.println("异常是:" + exception);
if (res != null) {
return res * 2;
}
return 0;
});
//get方法会阻塞等待
Integer integer = future1.get();
System.out.println("主线程结束:" + integer);
}
//结果是:
主线程开始
当前线程:12
运行结果:5
异常是:null
主线程结束:10
四、线程串行化方法
//不能获取上一步的执行结果,无返回值
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
//能获取上一步的执行结果,但是无返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(asyncPool, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
//既能获取上一步的返回结果,又有自己的返回值
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
五、两任务组合——都要完成
//组合两个future,不需要获取future的结果,只需要两个future执行完成之后处理该任务
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action) {
return biRunStage(null, other, action);
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action) {
return biRunStage(asyncPool, other, action);
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
return biRunStage(screenExecutor(executor), other, action);
}
//组合两个future,获取两个future返回的结果,然后处理任务,没有返回值
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(null, other, action);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(asyncPool, other, action);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor) {
return biAcceptStage(screenExecutor(executor), other, action);
}
//组合两个future,获取两个future返回的结果,然后处理任务,并返回当前任务的返回值
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(asyncPool, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}
六、两任务组合——一个完成
//两个任务有一个执行完成,不需要获取future的结果,处理任务,没有返回值
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
Runnable action) {
return orRunStage(null, other, action);
}
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action) {
return orRunStage(asyncPool, other, action);
}
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
return orRunStage(screenExecutor(executor), other, action);
}
//两个任务有一个执行完成,获取future的结果,处理任务,没有返回值
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(null, other, action);
}
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(asyncPool, other, action);
}
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor) {
return orAcceptStage(screenExecutor(executor), other, action);
}
//两个任务有一个执行完成,获取future的结果,处理任务,并返回处理结果
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(null, other, fn);
}
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(asyncPool, other, fn);
}
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor) {
return orApplyStage(screenExecutor(executor), other, fn);
}
七、多任务组合
//等待所有任务完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
//只要有一个任务完成
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}