文章目录
1 ExecutorService执行批量任务的缺陷
Future接口提供了一种在未来某个时间节点判断异步任务是否完成执行、获取运算结果等操作的方式。如果在异步任务仍在继续运行之时执行get方法,会使得当前线程进入阻塞直到异步任务运行结束(正常结束/异常结束)。因此无论是通过ExecutorService提交Runnable类型的任务还是Callable类型的任务,只要你关注异步任务的运行结果,就必须持续跟踪返回Future引用。
Future除了“调用者线程需要持续对其进行关注才能获得结果”这个缺陷之外,还有一个更为棘手的问题在于,当通过ExecutorService的批量任务执行方法invokeAll来执行一批任务时,无法第一时间获取最先完成异步任务的返回结果
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
// 搞它三个任务
List<Callable<Integer>> callables = Arrays.asList(
() ->
{
// 模拟耗时30秒
sleep(30);
System.out.println("Task 30 completed done.");
return 30;
},
() ->
{
// 模拟耗时30秒
sleep(10);
System.out.println("Task 10 completed done.");
return 10;
}, () ->
{
// 模拟耗时30秒
sleep(20);
System.out.println("Task 20 completed done.");
return 20;
});
try {
// 批量提交执行异步任务,该方法会阻塞等待所有的Future返回
List<Future<Integer>> futures = executor.invokeAll(callables);
futures.forEach(future -> {
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void sleep(long seconds) throws InterruptedException {
TimeUnit.SECONDS.sleep(seconds);
}
在上面的代码中我们定义了三个批量任务,很明显,耗时10秒的任务将会第一个被执行完成,但是很遗憾,我们无法立即使用该异步任务运算所得的结果。原因是在批量任务中存在一个拖后腿的(30秒才能运行结束),因此想要在接下来的程序运行中使用上述批量任务的结果至少还要等待30秒的时间,这对于耗时较快的任务来说是一种非常不必要的等待。
Task 10 completed done.
Task 20 completed done.
Task 30 completed done.
// 所有任务完成之后才能进行下一步的处理
30
10
20
3 CompletionService详解
CompletionService则采用了异步任务提交和计算结果Future解耦的一种设计方式,在CompletionService中,我们进行任务的提交,然后通过操作队列的方式(比如take或者poll)来获取消费Future。
CompletionService并不是ExecutorService的子类,因此它并不具备执行异步任务的能力(异步任务的执行是由CompletionService内部的ExecutorService来完成的),它只是对Executor-Service的一个封装,在其内部提供了阻塞队列用于Future的消费
ExecutorCompletionService就是CompletionService的一个实现
public class ExecutorCompletionService<V> implements CompletionService<V> {
// 持有Executor
private final Executor executor;
private final AbstractExecutorService aes;
// 在其内部提供了阻塞队列用于Future的消费
private final BlockingQueue<Future<V>> completionQueue;
CompletionService很好地解决了异步任务的问题,在CompletionService中提供了提交异步任务的方法(真正的异步任务执行还是由其内部的ExecutorService完成的),任务提交之后调用者不再关注Future,而是从BlockingQueue中获取已经执行完成的Future,在异步任务完成之后Future才会被插入阻塞队列,也就是说调用者从阻塞队列中获取的Future是已经完成了的异步执行任务,所以再次通过Future的get方法获取结果时,调用者所在的当前线程将不会被阻塞。
3.1 快速入门
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
// 定义CompletionService使用ExecutorService
CompletionService<Integer> completionService
= new ExecutorCompletionService<>(executor);
// 搞它三个任务
List<Callable<Integer>> callables = Arrays.asList(
() ->
{
// 模拟耗时30秒
sleep(30);
System.out.println("Task 30 completed done.");
return 30;
},
() ->
{
// 模拟耗时30秒
sleep(10);
System.out.println("Task 10 completed done.");
return 10;
}, () ->
{
// 模拟耗时30秒
sleep(20);
System.out.println("Task 20 completed done.");
return 20;
});
// 批量提交执行异步任务,该方法会阻塞等待所有的Future返回
callables.forEach(completionService::submit);
// 从completionService中获取已完成的Future,take方法会阻塞
for(int i = 0; i < callables.size(); i++) {
try {
Integer integer = completionService.take().get();
System.out.println(integer);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
private static void sleep(long seconds) throws InterruptedException {
TimeUnit.SECONDS.sleep(seconds);
}
此时就不会出现第一节的问题:
Task 10 completed done.
10
Task 20 completed done.
20
Task 30 completed done.
30
3.2 方法和构造方式
3.2.1 构造
CompletionService并不具备异步执行任务的能力,因此要构造CompletionService则需要ExecutorService,当然还允许指定不同的BlockingQueue实现。
// BlockingQueue默认为LinkedBlockingQueue(可选边界)。
ExecutorCompletionService(Executor executor);
// 允许在构造时指定不同的BlockingQueue。
ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue);
3.2.2 提交任务
// 提交Callable类型的任务
Future<V> submit(Callable<V> task)
// 提交Runnable类型的任务
Future<V> submit(Runnable task, V result);
3.2.3 获取futrue
// 从CompletionService的阻塞队列中获取已执行完成的Future,如果此刻没有一个任务完成则会立即返回null值。
// 非阻塞
Future<V> poll();
// 同上,指定了超时设置。
Future<V> poll(long timeout, TimeUnit unit);
// 会使当前线程阻塞,直到在CompletionService中的阻塞队列有完成的异步任务Future。
Future<V> take() throws InterruptedException;