举个例子:现在要向服务器发送HTTP请求,服务端对于每个请求都需要做很多额外操作,很消耗时间,则可以将每个请求接受之后,提交到CompletionService异步处理,等执行完毕之后,在返回给客户端
package com.yf.concurrent; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class CompletionServiceTest { private ExecutorService threadPool = Executors.newCachedThreadPool(); private CompletionService<Response> completionService = new ExecutorCompletionService<Response>( Executors.newCachedThreadPool()); public CompletionServiceTest() { new Thread() { public void run() { while (true) { try { Future<Response> f = completionService.take(); /** * 获取响应信息,返回给客户端 * 如果completionService任务队列为空,此处将阻塞 */ Response resp = f.get(); System.out.println(resp.getId()); } catch (Exception e) { System.out.println("Exception happened:"+e.getMessage()); } } }; }.start(); } class Request{ private int rid; private String body; public int getRid() { return rid; } public void setRid(int rid) { this.rid = rid; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } } class Response { private int id; private String body; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } } class HTTPExecutor { public Future<Response> execute(final Request request) { Future<Response> f = threadPool.submit(new Callable<Response>() { public Response call() throws Exception { Response response = new Response(); Thread.currentThread().sleep(3000); response.setId(request.getRid()); response.setBody("response"); return response; } }); return f; } } public void submitHTTP(final Request request) { completionService.submit(new Callable<Response>() { public Response call() throws Exception { return new HTTPExecutor().execute(request).get(); } }); } public static void main(String[] args) { CompletionServiceTest t = new CompletionServiceTest(); for (int i = 0; i < 10; i++) { /** * 发送10个HTTP请求 */ Request request =t.new Request(); request.setRid(i); request.setBody("request"); t.submitHTTP(request); } } }
可以简单查看一下CompletionService的唯一实现类ExecutorCompletionService源码
关键代码如下:
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; } public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }
通过ExecutorCompletionService的构造器可知,CompletionService 依赖于一个单独的 Executor 来实际执行任务,内部管理了一个阻塞队列来,在调用submit方法时,会向创建一个新的RunnableFuture,然后异步执行该RunnableFuture,当其状态变为done后,添加CompletionService的阻塞队列中,外部通过调用take()(阻塞)或者poll()(非阻塞,为空返回null)方法获取执行结果。