一篇文章搞懂 CompletionService

前言

本文隶属于专栏《100个问题搞定Java并发》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见100个问题搞定Java并发

正文

WHY

如果向 Executor 提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的 Future ,然后反复使用 get 方法,同时将参数 timeout 指定为 0 ,从而通过轮询来判断任务是否完成。

这种方法虽然可行,但却有些繁琐。

幸运的是,还有一种更好的方法:

完成服务( CompletionService )

WHAT

CompletionService 将 Executor 和 BlockingQueue 的功能融合在一起。

你可以将 Callable 任务提交给它来执行,然后使用类似于队列操作的 take 和 poll 等方法来获得已完成的结果,而这些结果会在完成时将被封装为 Future 。

ExecutorCompletionService 实现了 CompletionService ,并将计算部分委托给一个 Executor。

ExecutorCompletionService 的实现非常简单。

在构造函数中创建一个 BlockingQueue 来保存计算完成的结果。

当计算完成时,调用 Future - Task 中的 done 方法。

当提交某个任务时,该任务将首先包装为一个 QueueingFuture ,这是 Futuretask 的一个子类,然后再改写子类的 done 方法,并将结果放入 BlockingQueue 中。

take 和 poll 方法委托给了 BlockingQueue ,这些方法会在得出结果之前阻塞。

QueueingFuture 源码(JDK8)


    /**
     * FutureTask extension to enqueue upon completion
     */
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

实战

使用 CompletionService 实现页面渲染器,可以通过 CompletionService 从两个方面来提高页面渲染器的性能:

缩短总运行时间以及提高响应性。

为每一幅图像的下载都创建一个独立任务,并在线程池中执行它们,从而将串行的下载过程转换为并行的过程:这将减少下载所有图像的总时间。

此外,通过从 CompletionService 中获取结果以及使毎张图片在下载完成后立刻显示出来,能使用户获得一个更加动态和更高响应性的用户界面。 如下所示:

public class Renderer {
	private final ExecutorService executor;

	Renderer(ExecutorService executor) {
		this.executor = executor;
	}

	void renderPage(CharSequence source) {
		List<ImageInfo> info = scanForImageInfo(source);
		CompletionService<ImageData> completionService = new ExecutorCompletionService<>(executor);
		for (ImageInfo imageInfo : info) {
			completionService.submit(new Callable<ImageData>() {
				@Override
				public ImageData call() throws Exception {
					return imageInfo.downloadImage();
				}
			});
			renderText(source);
		}

		try {
			for (ImageInfo imageInfo : info) {
				Future<ImageData> future = completionService.take();
				ImageData imageData = future.get();
				renderImage(imageData);
			}
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		} catch (ExecutionException e) {
			throw new RuntimeException(e);
		}
	}
}

上一篇:springboot定时任务的启停


下一篇:C++多线程编程第九讲--future其他成员函数、shared_future、atomic