背景介绍
有时候我们需要执行一批相似的任务,并且要求这些任务能够并行执行。通常,我们的需求会分为两种情况:
- 并行执行一批任务,等待耗时最长的任务完成之后,再处理所有任务的结果。
- 并行执行一批任务,依次处理完成的任务结果(哪个任务先执行完就先处理哪个)。
这篇文章要介绍的两种批量执行任务的方式,正好对应了上述两种情况,下面分别介绍在Java中,如何使用并发包里面的API完成我们的需求。
使用ExecutorSevice#invokeAll()
通过向线程池提交一组任务,可以实现上述第一种批量执行的需求。下面来看具体例子。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建两个任务
Callable<Integer> task1 = () -> {
TimeUnit.SECONDS.sleep(3);
System.out.println("[" + Thread.currentThread().getName() + "]" + " task1 finished");
return 1;
};
Callable<Integer> task2 = () -> {
TimeUnit.SECONDS.sleep(10);
System.out.println("[" + Thread.currentThread().getName() + "]" + " task2 finished");
return 2;
};
List<Callable<Integer>> tasks = new ArrayList<>();
tasks.add(task1);
tasks.add(task2);
// 创建线程池
ExecutorService service = Executors.newFixedThreadPool(2);
// 调用invokeAll方法,批量提交一组任务
List<Future<Integer>> futures = service.invokeAll(tasks);
for (Future<Integer> future : futures) {
// 在这里获取任务的返回值时,会等待所有任务都执行完才返回结果
Integer result = future.get();
System.out.println("[" + Thread.currentThread().getName() + "]" + " " + result);
}
// 关闭线程池
service.shutdown();
}
}
执行结果:
使用CompletionService
CompletionService也可以用来提交一组任务,让这些任务并行执行,任何一个任务执行完之后别的线程可以立即获得计算结果。看一下具体例子:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建两个任务
Callable<Integer> task1 = () -> {
TimeUnit.SECONDS.sleep(3);
System.out.println("[" + Thread.currentThread().getName() + "]" + " task1 finished");
return 1;
};
Callable<Integer> task2 = () -> {
try {
TimeUnit.SECONDS.sleep(10);
System.out.println("[" + Thread.currentThread().getName() + "]" + " task2 finished");
}catch (InterruptedException e) {
// 第二个任务耗时长,等第一个任务完成之后,手动取消第二个任务的执行,此时第二个任务可能会收到中断。
e.printStackTrace();
}
return 2;
};
List<Callable<Integer>> tasks = new ArrayList<>();
tasks.add(task1);
tasks.add(task2);
// 创建线程池
ExecutorService pool = Executors.newFixedThreadPool(2);
// 使用CompletionService批量提交任务
CompletionService<Integer> cs = new ExecutorCompletionService<>(pool);
List<Future<Integer>> futures = new ArrayList<>();
for (Callable<Integer> task : tasks) {
Future<Integer> f = cs.submit(task);
futures.add(f);
}
// 获取第一个执行完的任务结果
Future<Integer> f = cs.take();
System.out.println("[" + Thread.currentThread().getName() + "]" + f.get());
futures.remove(f);
// 取消其他任务
for (Future<Integer> future : futures) {
future.cancel(true);
}
// 关闭线程池
pool.shutdown();
}
}
运行结果:
第一个任务执行完之后,第二个任务还在运行中,但是我们已经不关心其计算结果了,手动取消这个任务的执行,因此第二个任务会收到一个中断。