前言
CompletableFuture是对Future的一种强有力的扩展,Future只能通过轮询isDone()方法或者调用get()阻塞等待获取一个异步任务的结果,才能继续执行下一步,当我们执行的异步任务很多,而且相互之前还要依赖结果的时候,可能会创建很多这样的Future,并通过get或者轮询等待执行结果返回之后继续执行,这样的代码显得很不方便而且也不高效。
通过前面的CompletionStage接口给我们提供了一系列将多个阶段(甚至是异步的)的结果相互关联执行的方法,如果把它和Future结合起来,那么可将这种便利与高效编程方式用于异步任务的执行。CompletableFuture就是这样的一个类,同时继承了CompletionStage和Future,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过lambda表达式的风格处理各个执行阶段的结果。
实现
CompletableFuture通过以下策略实现了接口CompletionStage:
- 依赖的非异步阶段提供的操作可以由完成当前CompletableFuture的线程执行,也可以由完成方法的任何其他调用者执行。
- 所有没有显式指定Executor参数的异步方法都使用ForkJoinPool.commonPool执行(除非它不支持至少两个并行级别,否则将创建一个新线程来运行每个任务).为了简化监视、调试和跟踪,所有生成的异步任务都是CompletableFuture.AsynchronousCompletionTask的实例。
- 所有实现CompletionStage的接口方法都是独立于其他公共方法实现的,因此一个方法的行为不会受到子类中其他方法重写的影响。
CompletableFuture通过以下策略实现了接口Future:
- CompletableFuture将取消看作是异常完成的另一种形式。cancel方法具有与completeExceptionally(new CancellationException())相同的效果。
- 在以CompletionException异常完成的情况下,get()和get(long, TimeUnit)方法抛出一个ExecutionException,其异常原因与对应的CompletionException中的原因相同。为了简化在大多数上下文中的使用,这个类还定义了join()和getNow方法,它们在这种情况下直接抛出CompletionException。
以下是CompletableFuture的内部实现概述:
由于CompletableFuture可以依赖其他一个甚至多个CompletableFuture,所以在内部实现的时候,每一个CompletableFuture都拥有一个依赖操作栈,栈中的元素是Completion的子类,它包含相关的操作、CompletableFuture以及源操作。当一个CompletableFuture完成之后会从栈中弹出并递归执行那些依赖它的CompletableFuture。由于依赖栈中的那些Completion元素也包含CompletableFuture对象,其CompletableFuture对象可能也拥有一个依赖栈,因此将形成一个非常复杂的依赖树。
CompletableFuture对每一种形式的实现使用了不同的Completion子类,例如:单输入(UniCompletion)、双输入(BiCompletion)、投影(使用BiCompletion两个输入中的任何一个(而不是两个)的双输入)、共享(CoCompletion,由两个源中的第二个使用)、零输入(不消费不产出的Runnable)操作和解除阻塞等待(get()、join()方法)的信号器Signallers。Completion类扩展了ForkJoinTask来启用异步执行(不增加空间开销,因为我们利用它的“标记”方法来维护声明).它还被声明为Runnable,可以被任意线程池调度执行。
CompletableFuture又在UniCompletion、BiCompletion、CoCompletion等这几种Completion子类的基础上扩展出了实现CompletionStage具体接口方法的前缀为"Uni", "Bi", "Or"的子类。例如实现单个输入、两个输入、两者之一的thenApply对应的就是UniApply、BiApply、OrApply。CompletableFuture在实现CompletionStage接口方法甚至自己独有的方法使都采用了相同的模式,以及调度策略,因此只要立即了一种方法的实现,其他方法都是类似的原理。
源码简述
runAsync/supplyAsync
虽然CompletableFuture提供了无参的构造方法,但我们一般从它的静态方法开始,根据是否有返回值,它对外提供了两种形式的执行异步任务的方法:
1 //执行无返回值的异步任务 2 public static CompletableFuture<Void> runAsync(Runnable runnable) 3 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) 4 5 //执行有返回值的异步任务 6 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) 7 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
它们都以async为后缀,根据CompletionStage的接口定义规律也可以知道是通过异步安排执行,又比如方法中带有run表示不消费也不产出型方法,再如,参数带有Executor的用自定义的线程池调度执行,否则使用默认的ForkJoinPool.commonPool执行。对于不支持并行运算的环境,例如单核CPU,CompletableFuture默认将采用一个任务创建一个Thread实例的方式执行。
我们以supplyAsync(Supplier<U> supplier)方法为例,继续向下分析:
1 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { 2 return asyncSupplyStage(asyncPool, supplier); 3 } 4 static <U> CompletableFuture<U> asyncSupplyStage(Executor e, 5 Supplier<U> f) { 6 if (f == null) throw new NullPointerException(); 7 CompletableFuture<U> d = new CompletableFuture<U>(); //新创建一个CompletableFuture 8 e.execute(new AsyncSupply<U>(d, f)); //安排异步执行 9 return d; //立即返回 10 }View Code
可见,supplyAsync具体的实现调用了asyncSupplyStage,这也是CompletableFuture的内部实现惯例,每一种方法的实现都对应一个xStage方法。用于创建stage对象(这里就是实现了CompletionStage接口的CompletableFuture),并安排任务的执行。这里由于是异步任务,所以直接创建了异步任务实例AsyncSupply,然后交给线程池执行。接着看AsyncSupply实现:
1 //实现了ForkJoinTask,Runnable可以被ForkJoinPool,或者其他实现Executor的自定义线程池调度 2 static final class AsyncSupply<T> extends ForkJoinTask<Void> 3 implements Runnable, AsynchronousCompletionTask { 4 CompletableFuture<T> dep; //依赖的CompletableFuture 5 Supplier<T> fn; //具体的任务执行逻辑 6 AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) { 7 this.dep = dep; this.fn = fn; 8 } 9 10 public final Void getRawResult() { return null; } 11 public final void setRawResult(Void v) {} 12 //exec由ForkJoinPool调度执行,最终直接调用run 13 public final boolean exec() { run(); return true; } 14 15 //由ForkJoinPool间接调度,或其他自定义线程池直接调用 16 public void run() { 17 CompletableFuture<T> d; Supplier<T> f; 18 if ((d = dep) != null && (f = fn) != null) { 19 dep = null; fn = null; 20 if (d.result == null) { //result为空表示任务还没完成 21 try { 22 //执行任务并将结果设置到依赖的CompletableFuture 23 d.completeValue(f.get()); 24 } catch (Throwable ex) { 25 //异常完成的情况 26 d.completeThrowable(ex); 27 } 28 } 29 //从依赖栈中弹出并触发执行依赖当前CompletableFuture的其他阶段 30 d.postComplete(); 31 } 32 } 33 }View Code
AsyncSupply实现了ForkJoinTask,Runnable是为了兼容ForkJoinPool线程池和其他自定义的Executor线程池实现,run方法就是线程调度时执行任务的逻辑,就是执行给定的操作,并将结果设置到当前任务对应的CompletableFuture对象d(也就是依赖该任务的阶段),最后通过d.postComplete触发其他依赖阶段d的其他任务执行。postComplete的逻辑如下:
1 //递归触发其他依赖当前阶段的其他阶段执行 2 final void postComplete() { 3 /* 4 * On each step, variable f holds current dependents to pop and run. 5 * It is extended along only one path at a time, pushing others to avoid unbounded recursion. 6 */ 7 CompletableFuture<?> f = this; Completion h; 8 while ((h = f.stack) != null || //f对应的栈不为空 9 (f != this && (h = (f = this).stack) != null)) { //f对应的栈为空了,重新回到this,继续另一条路径 10 CompletableFuture<?> d; Completion t; 11 if (f.casStack(h, t = h.next)) { //将f的栈顶元素h出栈 12 if (t != null) { //表示出栈的h不是最后一个元素 13 if (f != this) { //f不是this,即不是当前栈 14 pushStack(h); //将f出栈的元素h压入当前的栈,这里是为了避免递归层次太深 15 continue; 16 } 17 h.next = null; // detach 辅助GC 18 } 19 //tryFire就是触发当前栈的栈顶h被执行,完成之后又返回依赖h的其它CompletableFuture, 20 //使其通过while循环继续触发依赖它的其余阶段任务的执行 21 f = (d = h.tryFire(NESTED)) == null ? this : d; 22 } 23 } 24 }View Code
postComplete的代码很简短,但是其代表的逻辑含义确非常不容易立即,这主要是因为形成的依赖树结构复杂,总之,postComplete就是递归的触发依赖当前阶段的其他任务的执行,它一次只沿着一条路径将其压入当前栈,避免递归调用的层次太深。具体的触发其他任务的执行是通过内嵌模式的tryFire方法来完成的,嵌套模式只用于这里。为了理解tryFire,我们再以thenApply为例。
CompletionStage实现之thenApply
这是实现CompletionStage的接口方法thenApply,它包含三种形式(同步、默认线程池的异步、指定线程池的异步)这里我以同步模式为例,一般来说,我们都会以下面这种形式使用thenApply:
1 CompletableFuture.supplyAsync(() -> { 2 try { 3 Thread.sleep(3000); 4 } catch (InterruptedException e) { 5 e.printStackTrace(); 6 } 7 return "hello"; 8 }).thenApply(s -> s + " world") 9 .thenAccept(System.out::println);
按照CompletableFuture的惯例,thenApply由一个uniApplyStage方法实现,创建一个新的CompletableFuture,并安排任务执行:
1 private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T,? extends V> f) { 2 if (f == null) throw new NullPointerException(); 3 CompletableFuture<V> d = new CompletableFuture<V>(); //创建CompletableFuture实例 4 if (e != null //Executor不为空,表示需要安排异步执行 5 || !d.uniApply(this, f, null)) { //尝试立即同步执行 6 //需要被安排异步执行,或者依赖的上一个阶段this还没完成,需要等待 7 UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); //构造UniApply实例 8 push(c); //将当前任务入栈,注意这里入的上一个阶段即this的栈,作为依赖其的阶段 9 c.tryFire(SYNC); // 10 } 11 return d; 12 }View Code
对于同步任务,在入栈等待前会通过一个布尔型的uniApply方法先尝试安排执行这个任务,这个布尔型的方法也是CompletableFuture实现其他多种形式的方法的惯例,对应每一种形式的方法实现都有一个这样的返回布尔型的方法:uniAccept、uniRun、uniWhenComplete、uniHandle、uniExceptionally、uniCompose、biApply等等。
1 //根据依赖的上一个阶段a是否完成,看要不要立即安排当前任务执行 2 //返回true表示已经同步完成执行了当前任务。为false表示依赖的阶段a还没完成,需要等待,或者已经安排异步执行(如果是异步任务的话) 3 final <S> boolean uniApply(CompletableFuture<S> a, 4 Function<? super S,? extends T> f, 5 UniApply<S,T> c) { 6 Object r; Throwable x; 7 if (a == null || (r = a.result) == null || f == null) 8 return false; //表示依赖的阶段a还没完成,还不能执行当前阶段 9 tryComplete: if (result == null) { //依赖的阶段a已经完成,当前阶段还没完成 10 if (r instanceof AltResult) { 11 //如果依赖的阶段a是异常结束,那么当前阶段也异常结束 12 if ((x = ((AltResult)r).ex) != null) { 13 completeThrowable(x, r); 14 break tryComplete; 15 } 16 r = null; 17 } 18 //到这里表示依赖的阶段a是正常结束 19 try { 20 if (c != null && !c.claim()) 21 return false; //只有在c不为空,并且不能被执行或者已经安排异步执行才会返回false 22 23 //拿到已经完成的依赖阶段a的结果,执行同步执行当前任务,并把结果设置到当前CompletableFuture阶段 24 @SuppressWarnings("unchecked") S s = (S) r; 25 completeValue(f.apply(s)); 26 } catch (Throwable ex) { 27 //异常完成的处理 28 completeThrowable(ex); 29 } 30 } 31 return true; 32 } 33 34 35 //通过自定义TAG,标记任务正在被执行,保证任务只会被执行一次。 36 //该方法只会在不能被执行或者已经安排异步执行才会返回false 37 final boolean claim() { 38 Executor e = executor; 39 //解锁成功,表示可以执行了 40 if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { 41 if (e == null) //需要被安排同步执行,立即返回true 42 return true; 43 executor = null; // disable 赋值GC 44 e.execute(this); //否则立即安排异步执行 45 } 46 return false; 47 }View Code
这个布尔型的方法返回true表示已经同步完成执行了当前任务。为false表示依赖的上一个阶段a还没完成,需要等待,或者已经安排异步执行(如果是异步任务的话),其中的claim方法通过CAS加锁保证任务只会被执行一次,同时还可以安排异步任务的执行。
回到uniApplyStage,如果是异步任务,或者还不能立即执行的同步任务(因为上一个阶段还没结束),则创建UniApply实例,并入栈,但在入栈之后,还会通过tryFire进行一次尝试同步执行,下面来看其tryFire实现:
1 //UniCompletion是Completion的子类,Completion继承了ForkJoinTask,实现了Runnable, AsynchronousCompletionTask 2 //UniCompletion的子类,主要就是实现tryFire 3 static final class UniApply<T,V> extends UniCompletion<T,V> { 4 Function<? super T,? extends V> fn; 5 UniApply(Executor executor, CompletableFuture<V> dep, 6 CompletableFuture<T> src, 7 Function<? super T,? extends V> fn) { 8 super(executor, dep, src); this.fn = fn; 9 } 10 11 //根据不同的模式,尝试触发执行 12 final CompletableFuture<V> tryFire(int mode) { 13 CompletableFuture<V> d; CompletableFuture<T> a; 14 if ((d = dep) == null || //已经执行过的阶段的dep才会为null 15 !d.uniApply(a = src, fn, mode > 0 ? null : this)) //尝试执行 16 return null; //已经安排异步任务异步执行,或者同步任务需要等待,返回null 17 dep = null; src = null; fn = null; 18 return d.postFire(a, mode); //同步任务已经执行完成,触发依赖它的其他阶段执行 19 } 20 }View Code
首先,UniApply是UniCompletion的子类,UniCompletion是Completion的子类,Completion继承了ForkJoinTask,实现了Runnable, AsynchronousCompletionTask ,所以不论是同步任务还是异常任务其实都是 AsynchronousCompletionTask 的实现类。
可见tryFire还是调用uniApply方法尝试执行的,不过这时候其第三个参数c不再是null,而是当前任务。因此会有机会执行claim方法来安排异步任务被线程池调度执行。在同步任务完成之后,通过postFire清理栈,并触发其他依赖该阶段的其他阶段执行。
1 final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) { 2 if (a != null && a.stack != null) { //源栈不为空 3 if (mode < 0 || a.result == null) //是内嵌模式或者源阶段a还没完成,清理一下其栈 4 a.cleanStack(); 5 else 6 a.postComplete(); //否则不是内嵌模式,并且源阶段a已经结束,继续触发依赖该阶段的其他阶段 7 } 8 if (result != null && stack != null) { //当前阶段已经完成,并且有依赖它的其他阶段 9 if (mode < 0) //内嵌模式,返回当前阶段 10 return this; 11 else 12 postComplete(); //同步或者异常模式,触发依赖它的其他阶段执行 13 } 14 return null; 15 }View Code
这里的postFire中mode小于0的内嵌模式就是上面supplyAsync中postComplete的while循环中传递给tryFire的参数,它会返回this,避免了递归太深。
通过以上源码的分析,可见跟在supplyAsync/runAsync异步阶段后面的同步阶段的执行可能会是调用整个阶段的外部主线程,也可能是执行异步阶段的线程池中的线程。如果在安排同步任务的时候,刚好上一个异步阶段已经结束,那么就会使用外部主线程执行,否则入栈之后,在异步任务完成后,会通过内嵌的方式由执行异步任务的线程池中的线程调度执行,而异步任务则始终会被线程池中的线程调度执行。
实现CompletionStage的其他接口方法都是类似thenApply相同的套路模式,就不一一列举了。其中有一个与用户方法不对应的“Relay”类/方法.它们将结果从一个阶段复制到另一个阶段,用于辅助实现其中的一些方法。
Future实现
cancel和isCancelled
说完了关于CompletionStage的实现,接下来介绍一下实现Future的接口,首先从最简单的取消开始,cancel / isCancelled:
1 //如果尚未完成,则使用CancellationException异常完成此CompletableFuture。 2 //尚未完成的依赖CompletableFutures也将以一个CancellationException导致的CompletionException异常完成。 3 public boolean cancel(boolean mayInterruptIfRunning) { 4 boolean cancelled = (result == null) && //还没完成 5 internalComplete(new AltResult(new CancellationException())); //尝试以CancellationException异常完成 6 postComplete(); //触发依赖它的其他阶段也异常结束 7 return cancelled || isCancelled(); 8 } 9 10 //如果此CompletableFuture在正常完成之前被取消,则返回true。 11 public boolean isCancelled() { 12 Object r; 13 //若结果是CancellationException异常,则表示是被取消的 14 return ((r = result) instanceof AltResult) && 15 (((AltResult)r).ex instanceof CancellationException); 16 }View Code
可见,cancel只会尝试取消还没完成的CompletableFuture(即还没有设置结果字段result),由于cancel 的参数mayInterruptIfRunning并没有使用,一旦任务已经在执行了,则不会中断其执行。取消行为只会在其未完成之前修改其结果指示其已经被取消,即使已经处于执行中的任务最后成功完成也不能再修改其结果。取消成功(即把result成功修改为被取消状态)的将会以CancellationException异常完成。不论取消是否成功,都会通过postComplete递归的触发依赖当前阶段的其他任务的尝试执行。
isDone()方法只要有了结果(result字段不为null),不论是正常还是异常结束都会返回true,由于CompletableFutures内部实现的时候将本身返回null的结果包装成了AltResult对象,所以当返回null结果时也不例外。
get()
对Future的实现最重要的就是非get莫属了:
1 //阻塞等待结果,可以被中断。 2 public T get() throws InterruptedException, ExecutionException { 3 Object r; 4 return reportGet((r = result) == null ? waitingGet(true) : r); 5 } 6 //在等待之后返回原始结果,如果是可中断或已经中断的,则返回null。 7 private Object waitingGet(boolean interruptible) { 8 Signaller q = null; 9 boolean queued = false; 10 int spins = -1; 11 Object r; 12 while ((r = result) == null) { //只要没完成就继续 13 if (spins < 0) //设置多处理上的自旋次数 14 spins = (Runtime.getRuntime().availableProcessors() > 1) ? 15 1 << 8 : 0; // Use brief spin-wait on multiprocessors 16 else if (spins > 0) { //自旋 17 if (ThreadLocalRandom.nextSecondarySeed() >= 0) 18 --spins; 19 } //自旋结束,还没完成,初始化Signaller 20 else if (q == null) 21 q = new Signaller(interruptible, 0L, 0L); 22 else if (!queued) //将Signaller入栈 23 queued = tryPushStack(q); 24 else if (interruptible && q.interruptControl < 0) { //可以被中断,并且已经被中断 25 q.thread = null; //辅助GC 26 cleanStack(); //清理一下其栈 27 return null; 返回null 28 } 29 else if (q.thread != null && result == null) { 30 try { 31 ForkJoinPool.managedBlock(q); //阻塞等待 32 } catch (InterruptedException ie) { 33 q.interruptControl = -1; //被中断了 34 } 35 } 36 } 37 38 //到这里说明result已经有了结果了 39 if (q != null) { 40 q.thread = null; //辅助GC 41 if (q.interruptControl < 0) { //有被中断过 42 if (interruptible) 43 r = null; // report interruption //若支持中断,则返回null 44 else 45 Thread.currentThread().interrupt(); //不支持中断,则补偿中断标记 46 } 47 } 48 postComplete(); //递归触发其他依赖当前阶段的其他阶段执行 49 return r; //支持中断并且被中断过,返回null,否则返回原始结果 50 } 51 52 //使用Future报告结果。 53 private static <T> T reportGet(Object r) 54 throws InterruptedException, ExecutionException { 55 if (r == null) // 结果为null表示可以被中断,并且被中断了,立即抛出中断异常 56 throw new InterruptedException(); 57 if (r instanceof AltResult) { //空结果或者异常结果 58 Throwable x, cause; 59 if ((x = ((AltResult)r).ex) == null) //空结果返回null 60 return null; 61 if (x instanceof CancellationException) //被取消的,返回CancellationException异常 62 throw (CancellationException)x; 63 if ((x instanceof CompletionException) && 64 (cause = x.getCause()) != null) //CompletionException异常结果的,返回导致其异常结束的异常 65 x = cause; 66 throw new ExecutionException(x); //其他运行时异常,包装成CompletionException异常抛出 67 } 68 @SuppressWarnings("unchecked") T t = (T) r; //非异常结束,返回正常结果 69 return t; 70 }View Code
先说结果:get()方法是可以被中断的,因此发生中断的话将会抛出InterruptedException(即使已经拿到结果),被取消的则会抛出CancellationException异常,任务执行产生的其他异常导致异常结束的,其异常会被封装成ExecutionException抛出。
其实现过程首先使用waitingGet方法返回获得的结果,如果支持中断并且被中断过则返回null,否则返回响应的结果,等待的过程采用了自旋 + ForkJoinPool.managedBlock方式,它将调用get方法等待结果也视为一种任务,构造成Completion的子类Signaller进入被等待执行结束的CompletableFutures依赖栈,一旦它完成就会通过postComplete方法的tryFire触发执行,其tryFire会唤醒阻塞的线程,从而使get方法返回,值得注意的是,这里阻塞使用了 ForkJoinPool.managedBlock的方法,因为阻塞的线程可能是ForkJoinPool线程池中的工作线程,为了不让线程池中的任务由于过多的工作线程被阻塞导致饥饿堆积等待, ForkJoinPool.managedBlock在阻塞的时候会激活新的线程补偿当前阻塞的线程,保证线程池的并行性。
最后,reportGet方法根据waitingGet方法返回的结果,该跑异常的跑异常,该返回正常结果的,返回正常结果。
超时版本的get方法,其实现阻塞等待的方法timedGet方法与waitingGet原理差不多,除了因为也支持中断可能抛出InterruptedException之外,还可能会因超时抛出TimeoutException,就不再详细说明了。
CompletableFutures自己的方法
T join()
join方法不是实现Future接口的方法,是CompletableFutures自己的方法,它与get方法的作用都是等待执行返回结果,但是它不支持中断,必须要等到执行结果:
1 public T join() { 2 Object r; 3 return reportJoin((r = result) == null ? waitingGet(false) : r); 4 } 5 6 //解析结果,或者抛出未捕获的异常 7 private static <T> T reportJoin(Object r) { 8 if (r instanceof AltResult) { //空结果或者异常结果 9 Throwable x; 10 if ((x = ((AltResult)r).ex) == null) //空结果返回null 11 return null; 12 if (x instanceof CancellationException) 13 throw (CancellationException)x; //被取消的,返回CancellationException异常 14 if (x instanceof CompletionException) 15 throw (CompletionException)x; //CompletionException异常 16 throw new CompletionException(x); //其他异常也被包装成CompletionException异常 17 } 18 @SuppressWarnings("unchecked") T t = (T) r; 19 return t; 20 }View Code
可见,join与get方法的区别有两点:一、不支持中断,一定要等到执行有了结果才会返回,但若被中断过,在返回前会恢复中断标记;二、除了被取消抛出CancellationException,其他异常一律被包装成CompletionException抛出。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
当所有给定的CompletableFutures都执行结束时,它就完成,或者其中任何一个异常结束,它也会立即以同样的异常结束。列表中任何一个CompletableFuture的结果都不会反映到返回的CompletableFuture中,但可以遍历列表通过其get方法获取。如果一个CompletableFuture都没指定,即cfs为长度为0的空数组,那么将返回一个完成结果为null的CompletableFuture。该方法的应用之一是,在继续一个程序之前,等待一组相互独立的任务完成,例如:CompletableFuture.allOf(c1, c2, c3).join();
由于allOf返回的是无返回结果的CompletableFuture,因此不能直接获取那些列表中每一个CompletableFuture的结果,例如这样将打印出null:
System.out.println(CompletableFuture.allOf(a,b,c,....).thenApply(V -> V).join());
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
当给定的CompletableFutures其中任何一个执行结束时,不论是正常还是异常结束,它就以相同的结果或者异常结束。注意异常结束的会被包装成CompletionException异常。如果一个CompletableFuture都没指定,即cfs为长度为0的空数组,那么将返回一个执行没有结束的CompletableFuture。注意,如果给定的cfs数组中的CompletableFuture其返回类型不一致,那么anyOf的最终返回的CompletableFuture的结果的类型也将不确定。不像allOf返回的Void的结果,anyOf可以用返回的结果继续处理,例如:
1 CompletableFuture.anyOf( 2 CompletableFuture.supplyAsync(() -> "Tom"), 3 CompletableFuture.supplyAsync(() -> "John"), 4 CompletableFuture.supplyAsync(() -> "Jack") 5 ).thenApply(name -> "hello "+ name) 6 .thenAccept(System.out::println);View Code
T getNow(T valueIfAbsent)
1 public T getNow(T valueIfAbsent) { 2 Object r; 3 return ((r = result) == null) ? valueIfAbsent : reportJoin(r); 4 }View Code
如果已经执行完成,就返回与执行join时相同的结果:即被取消的抛出CancellationException,正常结果的返回结果,其他异常一律被包装成CompletionException抛出。如果调用该方法的时候还没执行结束,则返回指定的结果。注意,在发现执行还没有结束时,该方法并没有强制使执行结束。
public boolean complete(T value)
1 public boolean complete(T value) { 2 //以指定的值设置到结果result字段,只会在result还是null的时候成功 3 boolean triggered = completeValue(value); 4 postComplete(); 5 return triggered; 6 }View Code
如果执行还没结束,就以指定的值作为其执行结果,并触发依赖它的其他阶段执行。由于该方法直接修改了执行的结果,即使后面该任务真正的逻辑执行完之后也不能再更新该result,所以如果此方法成功,那么调用该阶段的get、join方法返回的值就是该方法指定的value值。
public boolean completeExceptionally(Throwable ex)
如果还没执行结果,就以指定的异常作为其执行结果,并触发依赖它的其他阶段执行。由于该方法直接修改了执行的结果,即使后面该任务真正的逻辑执行完之后也不能再更新该result,所以如果此方法成功,那么调用该阶段的get、join方法也将抛出异常,不同的是,get可能抛出被封装成ExecutionException的异常,join可能抛出被封装成CompletionException的异常。
public void obtrudeValue(T value) 和 public void obtrudeException(Throwable ex)
强制以指定的值或者异常作为当前阶段的执行结果,不论其是否已经完成。与complete和completeExceptionally不同,complete和completeExceptionally只会在执行还没结束的情况,更新结果。在这只后返回的get、join也将返回相应的结果或异常。这两方法一般用于错误恢复,但也不一定有用。
public boolean isCompletedExceptionally()
如果执行以异常完成,则该方法返回true,这里的异常结束包括被取消,被completeExceptionally和obtrudeException方法主动异常结束,当然也包括任务执行过程中异常结束的情况。
public int getNumberOfDependents()
返回等待该CompletableFutures完成的其他CompletableFutures的估计个数,只是一个瞬态值,一般用于监控系统状态。
构造方法
除了使用静态方法runAsync,supplyAsync之外,构造一个CompletableFutures还可以通过以下几种途径,首先就是构造方法:
CompletableFutures提供了两个构造方法,一个无参,一个可以指定结果:
1 public CompletableFuture() { 2 } 3 4 private CompletableFuture(Object r) { 5 this.result = r; 6 }View Code
另一种就是使用静态方法completedFuture:
public static <U> CompletableFuture<U> completedFuture(U value)
其内部还是使用的有参的构造方法返回一个CompletableFuture实例。利用构造无参的构造方法可以这些实现异步任务的执行:
1 public static CompletableFuture<String> asyncDoSomething(String a, String b){ 2 CompletableFuture future = new CompletableFuture(); 3 new Thread(() -> { 4 try{ 5 Thread.sleep(TimeUnit.SECONDS.toSeconds(10000)); 6 future.complete(a + b); 7 }catch (Exception e){ 8 future.completeExceptionally(e); 9 } 10 }).start(); 11 return future; 12 } 13 public static void main(String[] args) { 14 asyncDoSomething("hello"," world").thenAccept(System.out::println); 15 }View Code
但,一般我们也不会这样用了,我这里只是为了举一个例子而已。
结束语
本文简要介绍了CompletableFuture的实现源码,其是CompletionStage与Future接口的实现类,它提供了大量的用于异步或同步执行任务的方法,这些方法的使用大都在上一篇CompletionStage的讲述中做了介绍,其通过链表栈形成的树形结构组织那些具有依赖关系的各个阶段CompletableFuture,异步任务的执行默认通过ForkJoinPool.commonPool执行,除非指定了Executor参数,而同步任务的执行又可能是被主线程执行,也可能是完成上一个阶段的线程池中的线程执行。
另外所有实现CompletionStage的接口方法都是独立于其他公共方法实现的,基本上每一个方法都对应了一个内部类,因此一个方法的行为不会受到子类中其他方法重写的影响。
实现Future的cancel方法不会中中断已经被安排执行的任务,仅仅是在任务的结果还没被回写之前,更新其结果为被取消状态,一旦将其结果设置为被取消状态,还没有开始仔执行的将不会被调度执行,已经在执行的,最后就算正常完成也不能再修改结果。
Future.get与CompletableFuture的join方法除了join不能被中断之外,对异常结束分别会将异常包装成ExecutionException、CompletionException,当然InterruptedException、TimeoutException、CancellationException除外,通常我们都会使用join而不是get,大概是join方法不需要处理异常,而get方法有InterruptedException、TimeoutException异常需要处理吧。