CompletableFuture类优雅异步编排的方法介绍

CompletableFuture是什么?
他可以优雅的做线程异步编排。
先干嘛,后干嘛。

runAsync(): 运行一个异步任务

public static CompletableFuture<Void> runAsync(Runnable runnable);
// executor指定线程池
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) 

// 模拟线程池
private static Executor executor = Executors.newFixedThreadPool(4);


CompletableFuture.runAsync(() -> {
            System.out.println("开始异步任务");

            int i = 10 / 2;

            System.out.println("结束异步任务");

        }, executor);
       
supplyAsync(): 运行一个异步任务, 并提供一个返回值。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
// executor指定线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,  Executor executor) ;


CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("开始异步任务");

        int i = 10 / 2;

        System.out.println("结束异步任务");
        return i;
    }, executor)
whenComplete(), exceptionally()
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始异步任务");

            int i = 10 / 2;

            System.out.println("结束异步任务");
            return i;
        }, executor).whenComplete((res, err) -> {

            System.out.println("异步结果是:"+res+",,,whenComplete抓住的异常为:"+err);

        }).exceptionally(err -> {
            System.out.println("exceptionally感知到的异常为:"+err);
            return 10;
        });

两个线程间调度

thenRun() 启动一个任务, 接着上一个任务做。
    public CompletableFuture<Void> thenRun(Runnable action);
    public CompletableFuture<Void> thenRunAsync(Runnable action);
    public CompletableFuture<Void> thenRunAsync(Runnable action,
                                                Executor executor) ;
        CompletableFuture.supplyAsync(() -> {
            System.out.println("开始异步任务,[[["+Thread.currentThread().getName());

            int i = 10 / 2;

            System.out.println("结束异步任务");
            return i;
        }, executor)
       .thenRun(() -> { // [main]thenRun方法
             System.out.println("["+Thread.currentThread().getName()+"]"+"thenRun方法");
        })
        .thenRunAsync(() -> { //[pool-1-thread-2]thenRun方法
            System.out.println("["+Thread.currentThread().getName()+"]"+"thenRun方法");
        }, executor)
        .thenRunAsync(() -> { //[ForkJoinPool.commonPool-worker-1]thenRun方法
            System.out.println("["+Thread.currentThread().getName()+"]"+"thenRun方法");
        });
thenAccept():可以拿到上一次的返回结果,但是thenAccept方法本身没有返回值,只是拿着上一次的返回值做一个task
        CompletableFuture.supplyAsync(() -> {
            System.out.println("开始异步任务,[[["+Thread.currentThread().getName());

            int i = 10 / 2;

            System.out.println("结束异步任务");
            return i;
        }, executor).thenAcceptAsync(res -> {
            System.out.println("["+Thread.currentThread().getName()+"]"+"thenAcceptAsync方法");
            System.out.println("从上一步拿到了结果"+res);
        }, executor);

thenApply():可以拿到上一次的返回结果,但是thenAccept方法本身没有返回值,只是拿着上一次的返回值做一个task
    private static Integer thenApply() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始异步任务,[[[" + Thread.currentThread().getName());

            int i = 10 / 2;

            System.out.println("结束异步任务");
            return i;
        }, executor).thenApplyAsync(res -> {
            System.out.println("[" + Thread.currentThread().getName() + "]" + "thenAcceptAsync方法");
            System.out.println("从上一步拿到了结果" + res);
            return 10;
        }, executor);

        return future.get();
    }


保证多个线程完成
  1. runBoth系列
  • runAfterBoth:前面的组合任务做完以后,才做
  • thenAcceptBothAsync:接收前两个返回值,做任务
  • thenCombineAsync:接收前两个返回值,做任务并提供返回值
  1. runEither系列
  • runAfterEitherAsync:完成其中一个即可
  • acceptEitherAsync:接受前两个任务的其中的一个返回值
  • applyToEitherAsync:接受前两个任务的其中的一个返回值, 并且自己提供返回值


    private static void runBoth() {
        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始异步任务01");

            int i = 10 / 2;

            System.out.println("结束异步任务01");
            return i;
        }, executor);

        CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始异步任务02");
            System.out.println("结束异步任务02");
            return "future02 is end";
        }, executor);


        //runAfterBothAsync
        future01.runAfterBothAsync(future02, () -> {
            System.out.println("两个任务都结束后,开始异步任务03");
        }, executor);

        //thenAcceptBothAsync
        future01.thenAcceptBothAsync(future02, (res01, res02) -> {
            System.out.println("任务1结果为:"+res01);
            System.out.println("任务2结果为:"+res02);
            System.out.println("开始任务3:");
        }, executor);

        return future01.thenCombineAsync(future02, (res01, res02) -> {
            System.out.println("任务1结果为:"+res01);
            System.out.println("任务2结果为:"+res02);
            System.out.println("开始任务3:");
            return "任务三"+res01+res02;
        }, executor).get();

    }


    private static void runEither() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始异步任务01");

            int i = 10 / 2;

            System.out.println("结束异步任务01");
            return i;
        }, executor);

        CompletableFuture<Integer> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始异步任务02");
            System.out.println("结束异步任务02");
            return 2;
        }, executor);


        //runAfterEitherAsync
      future01.runAfterEitherAsync(future02, () -> {
           System.out.println("完成其中一个后,开始异步任务03");
       }, executor);

        //acceptEitherAsync
        future01.acceptEitherAsync(future02, (res) -> {
            System.out.println("任务结果为:"+res);
           System.out.println("开始任务3:");
        }, executor);

        //applyToEitherAsync
        CompletableFuture<Object> future = future01.applyToEitherAsync(future02, (res) -> {
            System.out.println("任务结果为:" + res);
            System.out.println("开始任务3:");
            return "任务三的Either结果";
        }, executor);
        
        System.out.println(future.get());


    }
上一篇:CompletableFuture实现异步获取结果并且等待所有异步任务完成


下一篇:Dubbo中CompletableFuture异步调用