一、ExecutorCompletionService
通常在执行一批需要返回结果的任务时,我们可以使用线程池来提高程序运行效率,通过线程池的 submit(Callable task) 不断提交异步任务,并将 Future 保存下来,之后遍历 Future,调用 get() 方法获取结果。
虽然任务都是异步执行的,但是 get Future 结果是阻塞的。例如第一个 future 需要计算5s才能返回结果,但是其他 future 不到1s就会返回计算结果,需要等待第一个 future 结果返回才能 get 到其他 future 的结果,这就白白浪费了很多时间。
此时就可以使用 ExecutorCompletionService,它实现了 CompletionService 接口。它的内部有一个先进先出的阻塞队列,用于保存执行结束的 future,通过调用 ExecutorCompletionService 的 take 方法,就可以获取到第一个已经执行完成的 Future,之后再调用 future 的 get 方法,就可以获取到最终的结果。
动动发财小手,关注 + 点赞 + 收藏不迷路。
二、Demo演示
ExecutorCompletionService 示例代码如下:
package thread;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.ZonedDateTime;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Author: tinker
* @Date: 2022/02/17 13:21
*/
public class ExecutorCompletionServiceDemo {
private static final String THREAD_FACTORY_NAME_FORMAT = "-pool-%d";
private static final int CORE_POOL_SIZE = 5;
private static final int MAXIMUM_POOL_SIZE = 10;
private static final int KEEP_ALIVE_TIME = 60;
private static final int LINKED_BLOCKING_QUEUE_CAPACITY = 1000;
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("executor-test" + THREAD_FACTORY_NAME_FORMAT)
.build();
Executor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(LINKED_BLOCKING_QUEUE_CAPACITY),
threadFactory,
new ThreadPoolExecutor.CallerRunsPolicy());
CompletionService<ExecutorCompletionServiceDemo.Result> completionService = new ExecutorCompletionService(executor);
for (int i = 1; i <= 5; i++) {
completionService.submit(new Task("task" + i, i));
}
System.out.println("print result");
for (int i = 1; i <= 5; i++) {
Future<Result> future = completionService.take();
System.out.println(future.get().result);
}
System.exit(0);
}
public static class Task implements Callable<ExecutorCompletionServiceDemo.Result> {
private String taskName;
private long sleepSeconds;
Task(String taskName, long sleepSeconds) {
this.taskName = taskName;
this.sleepSeconds = sleepSeconds;
}
@Override
public ExecutorCompletionServiceDemo.Result call() throws Exception {
Thread.sleep(sleepSeconds * 1000);
return new Result(Thread.currentThread().getName() + ": " + taskName + " run end, time = " + ZonedDateTime.now());
}
}
public static class Result {
private String result;
Result(String result) {
this.result = result;
}
}
}
输出如下:
print result
executor-test-pool-4: task1 run end, time = 2022-02-17T16:48:45.025+08:00[Asia/Shanghai]
executor-test-pool-3: task2 run end, time = 2022-02-17T16:48:45.947+08:00[Asia/Shanghai]
executor-test-pool-2: task3 run end, time = 2022-02-17T16:48:46.946+08:00[Asia/Shanghai]
executor-test-pool-1: task4 run end, time = 2022-02-17T16:48:47.945+08:00[Asia/Shanghai]
executor-test-pool-0: task5 run end, time = 2022-02-17T16:48:48.946+08:00[Asia/Shanghai]
通过输出结果可以看出,线程0 sleep时间为5s,最后一个print,线程4 sleep时间为1s,第一个print。
引用:
1.https://blog.csdn.net/windrui/article/details/101366444