CompletableFuture异步编排

一、创建异步对象

CompletableFuture提供了四个静态方法来创建一个异步操作。

//使用默认的线程池来回调
public static CompletableFuture<Void> runAsync(Runnable runnable) {...}
//该方法指定自定义的线程池executor来执行  --推荐
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {...}

//下面两个方法都能获取到异步方法的返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){...}
//推荐
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor){...}

runAsync方法创建:

public static void main(String[] args) {
    System.out.println("主线程开始");
    ExecutorService executor = Executors.newFixedThreadPool(10);
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        System.out.println("当前线程:" + Thread.currentThread().getId());
        int i = 10 / 3;
        System.out.println("运行结果:" + i);
    }, executor);
    System.out.println("主线程结束");
}
//运行结果:
主线程开始
主线程结束
当前线程:12
运行结果:3

supplyAsync方法创建:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("主线程开始");
    ExecutorService executor = Executors.newFixedThreadPool(10);
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("当前线程:" + Thread.currentThread().getId());
        int i = 10 / 3;
        System.out.println("运行结果:" + i);
        return i;
    });
    //get方法会阻塞等待
    Integer integer = future1.get();
    System.out.println("主线程结束:" + integer);
}
//运行结果:
主线程开始
当前线程:12
运行结果:3
主线程结束:3

二、计算完成时回调方法

//处理正常返回的情况
//执行当前任务的线程继续执行whenComplete的任务
public CompletableFuture<T> whenComplete(
    BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(null, action);
}
//以新的线程池来执行任务
public CompletableFuture<T> whenCompleteAsync(
    BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(asyncPool, action);
}
//以指定的线程池executor来执行任务
public CompletableFuture<T> whenCompleteAsync(
    BiConsumer<? super T, ? super Throwable> action, Executor executor) {
    return uniWhenCompleteStage(screenExecutor(executor), action);
}

//处理异常情况
public CompletableFuture<T> exceptionally(
    Function<Throwable, ? extends T> fn) {
    return uniExceptionallyStage(fn);
}

方法不以Async结尾,意味着使用相同的线程执行,而Async则会使用其他线程来执行。

whenComplete处理

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("主线程开始");
    ExecutorService executor = Executors.newFixedThreadPool(10);
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("当前线程:" + Thread.currentThread().getId());
        int i = 10 / 0;
        System.out.println("运行结果:" + i);
        return i;
    }).whenComplete((res, exception)->{
        System.out.println("结果是:" + res + ";异常是:" + exception);
    });
    //get方法会阻塞等待
    Integer integer = future1.get();
    System.out.println("主线程结束:" + integer);
}

//结果:
主线程开始
当前线程:12
运行结果:5
结果是:5;异常是:null
主线程结束:5

exceptionally处理异常

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("主线程开始");
    ExecutorService executor = Executors.newFixedThreadPool(10);
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("当前线程:" + Thread.currentThread().getId());
        int i = 10 / 0;
        System.out.println("运行结果:" + i);
        return i;
    }).whenComplete((res, exception)->{
        System.out.println("结果是:" + res + ";异常是:" + exception);
    }).exceptionally(throwable -> {
        System.out.println("捕获到异常:" + throwable);
        //异常是返回默认值0  即可将错误的结果进行修正
        return 0;
    });
    //get方法会阻塞等待
    Integer integer = future1.get();
    System.out.println("主线程结束:" + integer);
}

//结果:
主线程开始
当前线程:12
结果是:null;异常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
捕获到异常:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
主线程结束:0

三、handle 方法

该方法可以处理completable执行完后的情况,不仅能够得到返回值,还可以修改返回值,也可感知异常。

public <U> CompletableFuture<U> handle(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(null, fn);
}

public <U> CompletableFuture<U> handleAsync(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(asyncPool, fn);
}

public <U> CompletableFuture<U> handleAsync(
    BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
    return uniHandleStage(screenExecutor(executor), fn);
}

hanle示例

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("主线程开始");
    ExecutorService executor = Executors.newFixedThreadPool(10);
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("当前线程:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("运行结果:" + i);
        return i;
    }).handle((res, exception)->{
        System.out.println("异常是:" + exception);
        if (res != null) {
            return res * 2;
        }
        return 0;
    });
    //get方法会阻塞等待
    Integer integer = future1.get();
    System.out.println("主线程结束:" + integer);
}

//结果是:
主线程开始
当前线程:12
运行结果:5
异常是:null
主线程结束:10

四、线程串行化方法

//不能获取上一步的执行结果,无返回值
public CompletableFuture<Void> thenRun(Runnable action) {
    return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
    return uniRunStage(asyncPool, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action,
                                            Executor executor) {
    return uniRunStage(screenExecutor(executor), action);
}

//能获取上一步的执行结果,但是无返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
    return uniAcceptStage(null, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
    return uniAcceptStage(asyncPool, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
                                               Executor executor) {
    return uniAcceptStage(screenExecutor(executor), action);
}

//既能获取上一步的返回结果,又有自己的返回值
public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(asyncPool, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn, Executor executor) {
    return uniApplyStage(screenExecutor(executor), fn);
}

五、两任务组合——都要完成

//组合两个future,不需要获取future的结果,只需要两个future执行完成之后处理该任务
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                            Runnable action) {
    return biRunStage(null, other, action);
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                 Runnable action) {
    return biRunStage(asyncPool, other, action);
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                 Runnable action,
                                                 Executor executor) {
    return biRunStage(screenExecutor(executor), other, action);
}

//组合两个future,获取两个future返回的结果,然后处理任务,没有返回值
public <U> CompletableFuture<Void> thenAcceptBoth(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action) {
    return biAcceptStage(null, other, action);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action) {
    return biAcceptStage(asyncPool, other, action);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action, Executor executor) {
    return biAcceptStage(screenExecutor(executor), other, action);
}

//组合两个future,获取两个future返回的结果,然后处理任务,并返回当前任务的返回值
public <U,V> CompletableFuture<V> thenCombine(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn) {
    return biApplyStage(null, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn) {
    return biApplyStage(asyncPool, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
    return biApplyStage(screenExecutor(executor), other, fn);
}

六、两任务组合——一个完成

//两个任务有一个执行完成,不需要获取future的结果,处理任务,没有返回值
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
                                              Runnable action) {
    return orRunStage(null, other, action);
}
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                   Runnable action) {
    return orRunStage(asyncPool, other, action);
}
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                   Runnable action,
                                                   Executor executor) {
    return orRunStage(screenExecutor(executor), other, action);
}

//两个任务有一个执行完成,获取future的结果,处理任务,没有返回值
public CompletableFuture<Void> acceptEither(
    CompletionStage<? extends T> other, Consumer<? super T> action) {
    return orAcceptStage(null, other, action);
}
public CompletableFuture<Void> acceptEitherAsync(
    CompletionStage<? extends T> other, Consumer<? super T> action) {
    return orAcceptStage(asyncPool, other, action);
}
public CompletableFuture<Void> acceptEitherAsync(
    CompletionStage<? extends T> other, Consumer<? super T> action,
    Executor executor) {
    return orAcceptStage(screenExecutor(executor), other, action);
}

//两个任务有一个执行完成,获取future的结果,处理任务,并返回处理结果
public <U> CompletableFuture<U> applyToEither(
    CompletionStage<? extends T> other, Function<? super T, U> fn) {
    return orApplyStage(null, other, fn);
}
public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn) {
    return orApplyStage(asyncPool, other, fn);
}
public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn,
    Executor executor) {
    return orApplyStage(screenExecutor(executor), other, fn);
}

七、多任务组合

//等待所有任务完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
    return andTree(cfs, 0, cfs.length - 1);
}

//只要有一个任务完成
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
    return orTree(cfs, 0, cfs.length - 1);
}

CompletableFuture异步编排

上一篇:字符串的排列


下一篇:OpenGL 实用攻关 001 准备(开题)