概述
想要知道线程的执行结果,除了FutureTask和CompletableFuture之外,CompletionService是另一个不错的选择。
CompletionService是一个Java8新增的泛型接口,其实现类ExecutorCompletionService,用于主线程提交多个任务后,任务完成即处理结果,并按照任务完成顺序逐个处理。这个类是为线程池中Task的执行结果服务的,即为Executor中Task返回Future而服务的。
CompletionService以异步的方式一边生产新的任务,一边处理已完成的任务的结果。这样可以将执行任务与处理处理分离开来处理。使用submit执行任务,使用take取得已经完成的任务。内部使用Executor框架和BlockingQueue来实现的。
原理
CompletionService源码:
public interface CompletionService<V> {
// 提交一个Callable类型任务,并返回该任务执行结果关联的Future
Future<V> submit(Callable<V> task);
// 提交一个Runnable类型任务,并返回该任务执行结果关联的Future
Future<V> submit(Runnable task,V result);
// 从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞,直到有任务完成
Future<V> take();
// 从内部阻塞队列中获取并移除第一个执行完成的任务,获取不到则返回null,不阻塞
Future<V> poll();
// 从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞时间为timeout,获取不到则返回null
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
实现类ExecutorCompletionService,该类只有三个成员变量,源码:
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
// ExecutorService的扩展,可以获得线程执行结果的
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
// 略
}
类图如下:
ExecutorCompletionService主要是增强executor线程池的。Task包装后被塞入completionQueue,当Task结束,其Future就可以从completionQueue中获取到。
ExecutorCompletionService 实现类依赖于 Executor 完成实际的任务提交执行,自己主要负责结果的排队处理,AbstractExecutorService.invokAny 实现就依赖此类,ExecutorCompletionService 内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果。
每个提交给 Executor 的任务都是通过继承 FutureTask 封装过的,FutureTask 在任务结束后会回调 done 方法,所以 ExecutorCompletionService 就在继承 FutureTask 封装重写的 done 方法中将当前 FutureTask 加入额外队列,然后通过其 take 或者 poll 方法获取的实质就是从这个额外队列中取数据。
从list中遍历的每个Future对象并不一定处于完成状态,这时调用get()方法就会被阻塞住,如果系统是设计成每个线程完成后就能根据其结果继续做后面的事,这样对于处于list后面的但是先完成的线程就会增加额外的等待时间。而CompletionService的实现是维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束时,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。
先完成的必定先被取出,减少不必要的等待时间。ExecutorCompletionService 类提供此方法的一个实现。
实例
// todo