Java:CompletableFuture的一些使用方法

定义

CompletableFuture,实现了FutureCompletionStage

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
	...
}

用法

  • supplyAsync supplyAsync(Supplier<U> supplier, Executor executor)异步,有返回值

Callable就是Supplier的一个实现,代表这个方法的结果有返回值

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("子线程:" + Thread.currentThread() + "执行时间:" + LocalTime.now());
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("子线程:" + Thread.currentThread() + "结束时间:" + LocalTime.now());
    return 10;
});
System.out.println("主线程:" + Thread.currentThread() + "执行时间:" + LocalTime.now());
System.out.println(future.get());
System.out.println("主线程:" + Thread.currentThread() + "结束时间:" + LocalTime.now());
// 子线程:Thread[ForkJoinPool.commonPool-worker-1,5,main]执行时间:15:41:10.852
// 主线程:Thread[main,5,main]执行时间:15:41:10.852
// 子线程:Thread[ForkJoinPool.commonPool-worker-1,5,main]结束时间:15:41:12.854
// 10
// 主线程:Thread[main,5,main]结束时间:15:41:12.854

可以看到,效果相当于ExecutorService submit(Callable<T> task)方法,有返回值

  • runAsync runAsync(Runnable runnable) 异步,无返回值
CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
    System.out.println("子线程:" + Thread.currentThread() + "执行时间:" + LocalTime.now());
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("子线程:" + Thread.currentThread() + "结束时间:" + LocalTime.now());
});
System.out.println("主线程:" + Thread.currentThread() + "执行时间:" + LocalTime.now());
System.out.println(future.get());
System.out.println("主线程:" + Thread.currentThread() + "结束时间:" + LocalTime.now());
// 主线程:Thread[main,5,main]执行时间:15:47:26.958
// 子线程:Thread[ForkJoinPool.commonPool-worker-1,5,main]执行时间:15:47:26.958
// 子线程:Thread[ForkJoinPool.commonPool-worker-1,5,main]结束时间:15:47:28.958
// null
// 主线程:Thread[main,5,main]结束时间:15:47:28.958

可以看到,效果相当于ExecutorService submit(Runnable task)方法,无返回值

这两方法各有一个重载版本,可以指定执行异步任务的Executor实现(线程池),如果不指定,默认使用ForkJoinPool.commonPool(),如果机器是单核的,则默认使用ThreadPerTaskExecutor,该类是一个内部类,每次执行execute都会创建一个新线程

传入自定义的线程池

ExecutorService executorService = Executors.newCachedThreadPool();
CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}, executorService);
System.out.println(future.get());

或者使用默认的ForkJoinPool线程池

ForkJoinPool pool = new ForkJoinPool();
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 10.1;
}, pool);
System.out.println(future.get());

  • thenApply

表示将上一步的操作执行完后接下来执行下一步操作,并且将上一步的结果作为入参

ForkJoinPool pool = new ForkJoinPool();
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
    int sum = 0;
    try {
        TimeUnit.SECONDS.sleep(3);
        for (int i = 1; i <= 100; i++) {
            sum += i;
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("(1)我是线程:" + Thread.currentThread());
    return sum;
}, pool);
CompletableFuture<Integer> f2 = f1.thenApply((result) -> {
    System.out.println("(2)我是线程:" + Thread.currentThread());
    return result + 1;
});
System.out.println("(3)我是线程:" + Thread.currentThread());
System.out.println("f1结果:" + f1.get());
System.out.println("f2结果:" + f2.get());

执行结果

(3)我是线程:Thread[main,5,main]
(1)我是线程:Thread[ForkJoinPool-1-worker-1,5,main]
(2)我是线程:Thread[ForkJoinPool-1-worker-1,5,main]
f1结果:5050
f2结果:5051

可以看到,f1和f2共用的一个线程

  • thenApplyAsync

我们把上一步的f1.thenApply 改为 f1.thenApplyAsync 来看看执行结果

(3)我是线程:Thread[main,5,main]
(1)我是线程:Thread[ForkJoinPool-1-worker-1,5,main]
f1结果:5050
(2)我是线程:Thread[ForkJoinPool.commonPool-worker-1,5,main]
f2结果:5051

可以看到和 thenApply 不一样的地方是,f2里是新开了一个线程来执行

  • thenAccept

thenApply 一样,接受上一步的执行结果作为入参,但是区别就是 thenAccept 没有返回值

ForkJoinPool pool = new ForkJoinPool();
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
    int sum = 0;
    try {
        TimeUnit.SECONDS.sleep(3);
        for (int i = 1; i <= 100; i++) {
            sum += i;
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("(1)我是线程:" + Thread.currentThread());
    return sum;
}, pool);
CompletableFuture<Void> f2 = f1.thenAccept((result) -> {
    System.out.println("(2)我是线程:" + Thread.currentThread());
    System.out.println("(2)中的result:" + result);
});
System.out.println("(3)我是线程:" + Thread.currentThread());
System.out.println("f1结果:" + f1.get());
System.out.println("f2结果:" + f2.get());

执行结果

(3)我是线程:Thread[main,5,main]
(1)我是线程:Thread[ForkJoinPool-1-worker-1,5,main]
f1结果:5050
(2)我是线程:Thread[ForkJoinPool-1-worker-1,5,main]
(2)中的result:5050
f2结果:null

可以看到f2是void无返回,所有获取到的结果为null,跟 thenApply 一样,f1和f2也是用的同一个线程

  • thenRun

thenAccept 不一样,它没有入参也没有返回值,获取f2的结果时也是null

CompletableFuture<Void> f2 = f1.thenRun(() -> {
    System.out.println("(2)我是线程:" + Thread.currentThread());
});
  • exceptionally
  1. f1中没有异常的情况
ForkJoinPool pool = new ForkJoinPool();
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
    int sum = 0;
    try {
        TimeUnit.SECONDS.sleep(3);
        for (int i = 1; i <= 100; i++) {
            sum += i;
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("(1)我是线程:" + Thread.currentThread());
    return sum;
}, pool);
CompletableFuture<Integer> f2 = f1.exceptionally((ex) -> {
    System.out.println("(2)我是线程:" + Thread.currentThread());
    ex.printStackTrace();
    return 0;
});
System.out.println("(3)我是线程:" + Thread.currentThread());
System.out.println("f1结果:" + f1.get());
System.out.println("f2结果:" + f2.get());

执行结果

(3)我是线程:Thread[main,5,main]
(1)我是线程:Thread[ForkJoinPool-1-worker-1,5,main]
f1结果:5050
f2结果:5050

f1无异常的时候,f2返回的是f1的结果

  1. f1有异常的情况
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("(1)我是线程:" + Thread.currentThread());
    if (true) {
        throw new RuntimeException("test ex");
    }
    return 5050;
}).exceptionally((ex) -> {
    System.out.println("(2)我是线程:" + Thread.currentThread());
    ex.printStackTrace();
    return 0;
});
System.out.println("(3)我是线程:" + Thread.currentThread());
System.out.println("f1结果:" + f1.get());

执行结果

Java:CompletableFuture的一些使用方法

可以看到,当f1中有异常的时候,触发exceptionally中的执行逻辑

  • whenComplete
  1. 先看f1无异常的情况
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("(1)我是线程:" + Thread.currentThread());
    return 5050;
});
CompletableFuture<Integer> f2 = f1.whenComplete((result, ex) -> {
    System.out.println("(2)我是线程:" + Thread.currentThread());
    System.out.println("(2)中result:" + result);
    if (ex != null) {
        ex.printStackTrace();
    }
});
System.out.println("(3)我是线程:" + Thread.currentThread());
System.out.println("f1结果:" + f1.get());
System.out.println("f2结果:" + f2.get());

执行结果

(1)我是线程:Thread[ForkJoinPool.commonPool-worker-1,5,main]
(2)我是线程:Thread[main,5,main]
(2)中result:5050
(3)我是线程:Thread[main,5,main]
f1结果:5050
f2结果:5050

可以看到正常无异常的情况下,f1、f2都能获取到结果,其实可以理解为f2就是f1,因为正常使用的话都是连着一起写的,比如:CompletableFuture.supplyAsync().whenComplete()

  1. f1有异常的情况
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("(1)我是线程:" + Thread.currentThread());
    if (true) {
        throw new RuntimeException("test ex");
    }
    return 5050;
});
CompletableFuture<Integer> f2 = f1.whenComplete((result, ex) -> {
    System.out.println("(2)我是线程:" + Thread.currentThread());
	System.out.println("(2)中result:" + result);
	if (ex != null) {
    	ex.printStackTrace();
	}
});
System.out.println("(3)我是线程:" + Thread.currentThread());
System.out.println("f1结果:" + f1.get()); // get时抛出了异常
System.out.println("f2结果:" + f2.get());

执行结果
Java:CompletableFuture的一些使用方法

可以看到,f1中发生了异常情况,f2中捕获到了异常,但是result没值,
因为result和ex是互斥的关系,有异常时没result,有result时没异常,同时主线程中执行f1.get()方法时,抛出了异常,导致主线程退出;
f2.get()一样会抛出异常,导致主线程中断

  • handle
  1. 无异常的情况

    f1的代码复用whenComplete中无异常的代码

CompletableFuture<String> f2 = f1.handle((result, ex) -> {
    System.out.println("(2)我是线程:" + Thread.currentThread());
    System.out.println("(2)中result:" + result);
    if (ex != null) {
        ex.printStackTrace();
    }
    return "我是(2)中的执行结果";
});

执行结果

(1)我是线程:Thread[ForkJoinPool.commonPool-worker-1,5,main]
(2)我是线程:Thread[main,5,main]
(2)中result:5050
(3)我是线程:Thread[main,5,main]
f1结果:5050
f2结果:我是(2)中的执行结果

正常无异常的时候执行结果和whenComplete是一样的,只不过f2中可以返回自己的结果,跟f1完全不相关,类型都可以不一样

  1. 有异常时执行结果
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("(1)我是线程:" + Thread.currentThread());
    if (true) {
        throw new RuntimeException("test ex");
    }
    return 5050;
});
CompletableFuture<String> f2 = f1.handle((result, ex) -> {
    System.out.println("(2)我是线程:" + Thread.currentThread());
    System.out.println("(2)中result:" + result);
    if (ex != null) {
        ex.printStackTrace();
    }
    return "我是(2)中的执行结果";
});
System.out.println("(3)我是线程:" + Thread.currentThread());
// System.out.println("f1结果:" + f1.get()); // f1触发了异常,会中断主线程
System.out.println("f2结果:" + f2.get());

执行结果
Java:CompletableFuture的一些使用方法

f1中触发了异常,异常以ex参数的形式传入到f2中,因为result和异常互斥,此时result为空,f2在打印了异常信息后,返回了自定义的结果,所以主线程中如果直接执行f2.get()能获取到结果,但是当f1出现异常后,在f2调用get方法之前调用了f1的get方法会结束主线程,f2相当于一个新的对象,消化了异常,返回了自己的信息,所以主线程能正常处理f2的get方法

  • allOf

allOf返回的CompletableFuture是多个任务都执行完成后才会执行,只要有一个任务执行异常,则返回的CompletableFuture对象执行get方法时会抛出异常中断主线程的执行,如果都是正常执行,则get返回null

CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("(1)我是线程:" + Thread.currentThread());
    int i = 1 / 0;
    return 100;
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("(2)我是线程:" + Thread.currentThread());
    return 200;
});
CompletableFuture<Void> f3 = CompletableFuture.allOf(f1, f2).whenComplete((result, ex) -> {
    System.out.println("(3)我是线程:" + Thread.currentThread());
    System.out.println("(3)中result:" + result);
    if (ex != null) {
        ex.printStackTrace();
    }
});
System.out.println("(4)我是线程:" + Thread.currentThread());
System.out.println("f2结果:" + f3.get());
.junit.Test
ic void test3() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("(1)我是线程:" + Thread.currentThread());
    if (true) {
        throw new RuntimeException("test ex");
    }
    return 5050;
});
CompletableFuture<Integer> f2 = f1.whenComplete((result, ex) -> {
    System.out.println("(2)我是线程:" + Thread.currentThread());
    System.out.println("(2)中result:" + result);
    if (ex != null) {
        ex.printStackTrace();
    }
});
System.out.println("(3)我是线程:" + Thread.currentThread());
// System.out.println("f1结果:" + f1.get()); // get时抛出了异常
System.out.println("f2结果:" + f2.get());

执行结果
Java:CompletableFuture的一些使用方法

上一篇:IO设计模式:Reactor和Proactor对比


下一篇:Proactor前摄器模式和Reactor反应器模式