ExecutorCompletionService用法简介

一、ExecutorCompletionService

通常在执行一批需要返回结果的任务时,我们可以使用线程池来提高程序运行效率,通过线程池的 submit(Callable task) 不断提交异步任务,并将 Future 保存下来,之后遍历 Future,调用 get() 方法获取结果。

虽然任务都是异步执行的,但是 get Future 结果是阻塞的。例如第一个 future 需要计算5s才能返回结果,但是其他 future 不到1s就会返回计算结果,需要等待第一个 future 结果返回才能 get 到其他 future 的结果,这就白白浪费了很多时间。

此时就可以使用 ExecutorCompletionService,它实现了 CompletionService 接口。它的内部有一个先进先出的阻塞队列,用于保存执行结束的 future,通过调用 ExecutorCompletionService 的 take 方法,就可以获取到第一个已经执行完成的 Future,之后再调用 future 的 get 方法,就可以获取到最终的结果。

动动发财小手,关注 + 点赞 + 收藏不迷路。

二、Demo演示

ExecutorCompletionService 示例代码如下:

package thread;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.time.ZonedDateTime;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Author: tinker
 * @Date: 2022/02/17 13:21
 */
public class ExecutorCompletionServiceDemo {

    private static final String THREAD_FACTORY_NAME_FORMAT = "-pool-%d";
    private static final int CORE_POOL_SIZE = 5;
    private static final int MAXIMUM_POOL_SIZE = 10;
    private static final int KEEP_ALIVE_TIME = 60;
    private static final int LINKED_BLOCKING_QUEUE_CAPACITY = 1000;

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("executor-test" + THREAD_FACTORY_NAME_FORMAT)
                .build();

        Executor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAXIMUM_POOL_SIZE,
                KEEP_ALIVE_TIME, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(LINKED_BLOCKING_QUEUE_CAPACITY),
                threadFactory,
                new ThreadPoolExecutor.CallerRunsPolicy());
        CompletionService<ExecutorCompletionServiceDemo.Result> completionService = new ExecutorCompletionService(executor);
        for (int i = 1; i <= 5; i++) {
            completionService.submit(new Task("task" + i, i));
        }

        System.out.println("print result");

        for (int i = 1; i <= 5; i++) {
            Future<Result> future = completionService.take();
            System.out.println(future.get().result);
        }

        System.exit(0);
    }

    public static class Task implements Callable<ExecutorCompletionServiceDemo.Result> {

        private String taskName;
        private long sleepSeconds;

        Task(String taskName, long sleepSeconds) {
            this.taskName = taskName;
            this.sleepSeconds = sleepSeconds;
        }

        @Override
        public ExecutorCompletionServiceDemo.Result call() throws Exception {
            Thread.sleep(sleepSeconds * 1000);
            return new Result(Thread.currentThread().getName() + ": " + taskName + " run end, time = " + ZonedDateTime.now());
        }
    }

    public static class Result {
        private String result;

        Result(String result) {
            this.result = result;
        }
    }

}


输出如下:

print result
executor-test-pool-4: task1 run end, time = 2022-02-17T16:48:45.025+08:00[Asia/Shanghai]
executor-test-pool-3: task2 run end, time = 2022-02-17T16:48:45.947+08:00[Asia/Shanghai]
executor-test-pool-2: task3 run end, time = 2022-02-17T16:48:46.946+08:00[Asia/Shanghai]
executor-test-pool-1: task4 run end, time = 2022-02-17T16:48:47.945+08:00[Asia/Shanghai]
executor-test-pool-0: task5 run end, time = 2022-02-17T16:48:48.946+08:00[Asia/Shanghai]

通过输出结果可以看出,线程0 sleep时间为5s,最后一个print,线程4 sleep时间为1s,第一个print。

引用:
1.https://blog.csdn.net/windrui/article/details/101366444

上一篇:PySpark之Spark的内核调度


下一篇:20220210 java.util.Queue