定义
CompletableFuture
,实现了Future
和CompletionStage
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
...
}
用法
-
supplyAsync
supplyAsync(Supplier<U> supplier, Executor executor)
异步,有返回值
Callable
就是Supplier
的一个实现,代表这个方法的结果有返回值
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("子线程:" + Thread.currentThread() + "执行时间:" + LocalTime.now());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("子线程:" + Thread.currentThread() + "结束时间:" + LocalTime.now());
return 10;
});
System.out.println("主线程:" + Thread.currentThread() + "执行时间:" + LocalTime.now());
System.out.println(future.get());
System.out.println("主线程:" + Thread.currentThread() + "结束时间:" + LocalTime.now());
// 子线程:Thread[ForkJoinPool.commonPool-worker-1,5,main]执行时间:15:41:10.852
// 主线程:Thread[main,5,main]执行时间:15:41:10.852
// 子线程:Thread[ForkJoinPool.commonPool-worker-1,5,main]结束时间:15:41:12.854
// 10
// 主线程:Thread[main,5,main]结束时间:15:41:12.854
可以看到,效果相当于ExecutorService submit(Callable<T> task)
方法,有返回值
-
runAsync
runAsync(Runnable runnable)
异步,无返回值
CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
System.out.println("子线程:" + Thread.currentThread() + "执行时间:" + LocalTime.now());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("子线程:" + Thread.currentThread() + "结束时间:" + LocalTime.now());
});
System.out.println("主线程:" + Thread.currentThread() + "执行时间:" + LocalTime.now());
System.out.println(future.get());
System.out.println("主线程:" + Thread.currentThread() + "结束时间:" + LocalTime.now());
// 主线程:Thread[main,5,main]执行时间:15:47:26.958
// 子线程:Thread[ForkJoinPool.commonPool-worker-1,5,main]执行时间:15:47:26.958
// 子线程:Thread[ForkJoinPool.commonPool-worker-1,5,main]结束时间:15:47:28.958
// null
// 主线程:Thread[main,5,main]结束时间:15:47:28.958
可以看到,效果相当于ExecutorService submit(Runnable task)
方法,无返回值
这两方法各有一个重载版本,可以指定执行异步任务的Executor实现(线程池),如果不指定,默认使用ForkJoinPool.commonPool(),如果机器是单核的,则默认使用ThreadPerTaskExecutor,该类是一个内部类,每次执行execute都会创建一个新线程
传入自定义的线程池
ExecutorService executorService = Executors.newCachedThreadPool();
CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, executorService);
System.out.println(future.get());
或者使用默认的ForkJoinPool线程池
ForkJoinPool pool = new ForkJoinPool();
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10.1;
}, pool);
System.out.println(future.get());
- thenApply
表示将上一步的操作执行完后接下来执行下一步操作,并且将上一步的结果作为入参
ForkJoinPool pool = new ForkJoinPool();
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
int sum = 0;
try {
TimeUnit.SECONDS.sleep(3);
for (int i = 1; i <= 100; i++) {
sum += i;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("(1)我是线程:" + Thread.currentThread());
return sum;
}, pool);
CompletableFuture<Integer> f2 = f1.thenApply((result) -> {
System.out.println("(2)我是线程:" + Thread.currentThread());
return result + 1;
});
System.out.println("(3)我是线程:" + Thread.currentThread());
System.out.println("f1结果:" + f1.get());
System.out.println("f2结果:" + f2.get());
执行结果
(3)我是线程:Thread[main,5,main]
(1)我是线程:Thread[ForkJoinPool-1-worker-1,5,main]
(2)我是线程:Thread[ForkJoinPool-1-worker-1,5,main]
f1结果:5050
f2结果:5051
可以看到,f1和f2共用的一个线程
- thenApplyAsync
我们把上一步的f1.thenApply
改为 f1.thenApplyAsync
来看看执行结果
(3)我是线程:Thread[main,5,main]
(1)我是线程:Thread[ForkJoinPool-1-worker-1,5,main]
f1结果:5050
(2)我是线程:Thread[ForkJoinPool.commonPool-worker-1,5,main]
f2结果:5051
可以看到和 thenApply 不一样的地方是,f2里是新开了一个线程来执行
- thenAccept
跟 thenApply 一样,接受上一步的执行结果作为入参,但是区别就是 thenAccept 没有返回值
ForkJoinPool pool = new ForkJoinPool();
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
int sum = 0;
try {
TimeUnit.SECONDS.sleep(3);
for (int i = 1; i <= 100; i++) {
sum += i;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("(1)我是线程:" + Thread.currentThread());
return sum;
}, pool);
CompletableFuture<Void> f2 = f1.thenAccept((result) -> {
System.out.println("(2)我是线程:" + Thread.currentThread());
System.out.println("(2)中的result:" + result);
});
System.out.println("(3)我是线程:" + Thread.currentThread());
System.out.println("f1结果:" + f1.get());
System.out.println("f2结果:" + f2.get());
执行结果
(3)我是线程:Thread[main,5,main]
(1)我是线程:Thread[ForkJoinPool-1-worker-1,5,main]
f1结果:5050
(2)我是线程:Thread[ForkJoinPool-1-worker-1,5,main]
(2)中的result:5050
f2结果:null
可以看到f2是void无返回,所有获取到的结果为null,跟 thenApply 一样,f1和f2也是用的同一个线程
- thenRun
和 thenAccept 不一样,它没有入参也没有返回值,获取f2的结果时也是null
CompletableFuture<Void> f2 = f1.thenRun(() -> {
System.out.println("(2)我是线程:" + Thread.currentThread());
});
- exceptionally
- f1中没有异常的情况
ForkJoinPool pool = new ForkJoinPool();
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
int sum = 0;
try {
TimeUnit.SECONDS.sleep(3);
for (int i = 1; i <= 100; i++) {
sum += i;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("(1)我是线程:" + Thread.currentThread());
return sum;
}, pool);
CompletableFuture<Integer> f2 = f1.exceptionally((ex) -> {
System.out.println("(2)我是线程:" + Thread.currentThread());
ex.printStackTrace();
return 0;
});
System.out.println("(3)我是线程:" + Thread.currentThread());
System.out.println("f1结果:" + f1.get());
System.out.println("f2结果:" + f2.get());
执行结果
(3)我是线程:Thread[main,5,main]
(1)我是线程:Thread[ForkJoinPool-1-worker-1,5,main]
f1结果:5050
f2结果:5050
f1无异常的时候,f2返回的是f1的结果
- f1有异常的情况
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("(1)我是线程:" + Thread.currentThread());
if (true) {
throw new RuntimeException("test ex");
}
return 5050;
}).exceptionally((ex) -> {
System.out.println("(2)我是线程:" + Thread.currentThread());
ex.printStackTrace();
return 0;
});
System.out.println("(3)我是线程:" + Thread.currentThread());
System.out.println("f1结果:" + f1.get());
执行结果
可以看到,当f1中有异常的时候,触发exceptionally
中的执行逻辑
- whenComplete
- 先看f1无异常的情况
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("(1)我是线程:" + Thread.currentThread());
return 5050;
});
CompletableFuture<Integer> f2 = f1.whenComplete((result, ex) -> {
System.out.println("(2)我是线程:" + Thread.currentThread());
System.out.println("(2)中result:" + result);
if (ex != null) {
ex.printStackTrace();
}
});
System.out.println("(3)我是线程:" + Thread.currentThread());
System.out.println("f1结果:" + f1.get());
System.out.println("f2结果:" + f2.get());
执行结果
(1)我是线程:Thread[ForkJoinPool.commonPool-worker-1,5,main]
(2)我是线程:Thread[main,5,main]
(2)中result:5050
(3)我是线程:Thread[main,5,main]
f1结果:5050
f2结果:5050
可以看到正常无异常的情况下,f1、f2都能获取到结果,其实可以理解为f2就是f1,因为正常使用的话都是连着一起写的,比如:CompletableFuture.supplyAsync().whenComplete()
- f1有异常的情况
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("(1)我是线程:" + Thread.currentThread());
if (true) {
throw new RuntimeException("test ex");
}
return 5050;
});
CompletableFuture<Integer> f2 = f1.whenComplete((result, ex) -> {
System.out.println("(2)我是线程:" + Thread.currentThread());
System.out.println("(2)中result:" + result);
if (ex != null) {
ex.printStackTrace();
}
});
System.out.println("(3)我是线程:" + Thread.currentThread());
System.out.println("f1结果:" + f1.get()); // get时抛出了异常
System.out.println("f2结果:" + f2.get());
执行结果
可以看到,f1中发生了异常情况,f2中捕获到了异常,但是result没值,
因为result和ex是互斥的关系,有异常时没result,有result时没异常,同时主线程中执行f1.get()方法时,抛出了异常,导致主线程退出;
f2.get()一样会抛出异常,导致主线程中断
- handle
-
无异常的情况
f1的代码复用
whenComplete
中无异常的代码
CompletableFuture<String> f2 = f1.handle((result, ex) -> {
System.out.println("(2)我是线程:" + Thread.currentThread());
System.out.println("(2)中result:" + result);
if (ex != null) {
ex.printStackTrace();
}
return "我是(2)中的执行结果";
});
执行结果
(1)我是线程:Thread[ForkJoinPool.commonPool-worker-1,5,main]
(2)我是线程:Thread[main,5,main]
(2)中result:5050
(3)我是线程:Thread[main,5,main]
f1结果:5050
f2结果:我是(2)中的执行结果
正常无异常的时候执行结果和
whenComplete
是一样的,只不过f2中可以返回自己的结果,跟f1完全不相关,类型都可以不一样
- 有异常时执行结果
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("(1)我是线程:" + Thread.currentThread());
if (true) {
throw new RuntimeException("test ex");
}
return 5050;
});
CompletableFuture<String> f2 = f1.handle((result, ex) -> {
System.out.println("(2)我是线程:" + Thread.currentThread());
System.out.println("(2)中result:" + result);
if (ex != null) {
ex.printStackTrace();
}
return "我是(2)中的执行结果";
});
System.out.println("(3)我是线程:" + Thread.currentThread());
// System.out.println("f1结果:" + f1.get()); // f1触发了异常,会中断主线程
System.out.println("f2结果:" + f2.get());
执行结果
f1中触发了异常,异常以ex参数的形式传入到f2中,因为result和异常互斥,此时result为空,f2在打印了异常信息后,返回了自定义的结果,所以主线程中如果直接执行f2.get()能获取到结果,但是当f1出现异常后,在f2调用get方法之前调用了f1的get方法会结束主线程,f2相当于一个新的对象,消化了异常,返回了自己的信息,所以主线程能正常处理f2的get方法
- allOf
allOf返回的CompletableFuture是多个任务都执行完成后才会执行,只要有一个任务执行异常,则返回的CompletableFuture对象执行get方法时会抛出异常中断主线程的执行,如果都是正常执行,则get返回null
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("(1)我是线程:" + Thread.currentThread());
int i = 1 / 0;
return 100;
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
System.out.println("(2)我是线程:" + Thread.currentThread());
return 200;
});
CompletableFuture<Void> f3 = CompletableFuture.allOf(f1, f2).whenComplete((result, ex) -> {
System.out.println("(3)我是线程:" + Thread.currentThread());
System.out.println("(3)中result:" + result);
if (ex != null) {
ex.printStackTrace();
}
});
System.out.println("(4)我是线程:" + Thread.currentThread());
System.out.println("f2结果:" + f3.get());
.junit.Test
ic void test3() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("(1)我是线程:" + Thread.currentThread());
if (true) {
throw new RuntimeException("test ex");
}
return 5050;
});
CompletableFuture<Integer> f2 = f1.whenComplete((result, ex) -> {
System.out.println("(2)我是线程:" + Thread.currentThread());
System.out.println("(2)中result:" + result);
if (ex != null) {
ex.printStackTrace();
}
});
System.out.println("(3)我是线程:" + Thread.currentThread());
// System.out.println("f1结果:" + f1.get()); // get时抛出了异常
System.out.println("f2结果:" + f2.get());
执行结果