本章主要学习ExecutorService接口。
1.ExecutorService接口概述
@since 1.5
ExecutorService接口是一个执行器,它可以终止任务。
ExecutorService接口还提供了返回Future接口的方法,这个方法可以用于追踪一个或多个异步任务的执行情况。
ExecutorService可以手动关闭,这种操作会导致它拒绝新的任务。
ExecutorService提供了两种方法用于关闭服务:
shutdown():不再接受新任务,允许之前已经提交的方法执行完毕,然后再关闭执行器。
shutdonwNow():阻止正在等待的任务开启,并且会试图停止正在执行的任务,然后关闭执行器。
在执行器终止时,它不会有正在执行的任务,不会有等待执行的任务,也不会再接收新提交的任务。
一个不再使用的ExecutorService应该关闭,以允许JVM回收其所占的资源。
submit()方法继承自Executor的方法,用于创建和返回一个Future对象,用于 取消任务执行 或 等待任务执行完毕。
invokeAny()方法和invokeAll()方法用于批量任务执行,然后等待一个或者全部任务执行完毕。
ExecutorCompletionService类可以用来对这些方法进行二次开发。
Executors工具类为此包中的ExecutorService提供了工厂方法。
使用示例
下面是一个web服务的示例程序,在线程池中对进来的请求提供服务。
它使用了预先配置的Executors的newFixedThreadPool来创建线程池。
class NetworkService implements Runnable { private final ServerSocket serverSocket; private final ExecutorService pool; public NetworkService(int port, int poolSize) throws IOException { serverSocket = new ServerSocket(port); pool = Executors.newFixedThreadPool(poolSize); } public void run() { // run the service try { for (;;) { pool.execute(new Handler(serverSocket.accept())); } } catch (IOException ex) { pool.shutdown(); } } } class Handler implements Runnable { private final Socket socket; Handler(Socket socket) { this.socket = socket; } public void run() { // read and service request on socket } }
下面演示了在ExecutorService中的两个关闭方法。
首先,通过第一个关闭方法shutdonw()来拒绝继续接收任务。
然后,在需要的情况下,通过第二个关闭方法shutdonwNow()取消任何拖延的任务。
void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } }
谨记一致性影响
一个线程中,提交一个Runnable或Callable任务到一个ExecutorService执行器之前的操作 happens-before 另一个线程中,这个任务执行后的任何后续操作。
即:
一个线程中,提交一个Runnable或Callable任务到一个ExecutorService执行器之前的操作 对 另一个线程中,这个任务执行后的任何后续操作 可见。
2.ExecutorService方法说明
ExecutorService相较于Executor,主要多了两个新特性:1、手动关闭;2、支持Callable和Future接口。
所以相应的也多了两类方法:
1.手动关闭
shutdown():不再接收新的任务,在当前已经提交的任务执行完毕后,关闭执行器。
shutdownNow():不再接收新的任务,尝试立刻关闭执行器,如果存在正在运行的任务,则终止这些任务。
isShutdown():执行器服务是否已经关闭。
isTerminated():在执行器服务执行shutdown()或者shutdownNow()方法之后,所有的任务是否已经完成。
awaitTermination(timeout,TimeUnit):阻塞等待所有的任务终止
如果线程池了执行shutdown()或者shutdownNow()方法,并且所有的任务都已经完成,则返回true。
如果等待时间超时,则返回false。
如果当前线程被interrupt,则抛出InterruptedException异常。
如果线程池未执行shutdown()或者shutdownNow()方法,则返回false。
2.支持Callable和Future接口
单任务执行
submit(Runnable task):提交一个Runnable任务。
submit(Runnable task, T result)提交一个Runnable任务,并将运行结果存放在result中。
submit(Callable<T> task):提交一个Callable任务。
批量任务执行
invokeAny:批量执行任务,获取任意一个最新执行完毕的结果(其他未执行完的任务不再执行)。
invokeAll:批量执行任务,获取所有执行完毕的Future对象。
3.实例练习一
练习目的:了解ExecutorService的两种手动关闭方式。
练习内容:
分别实现以下三种执行器的关闭方式:
方式1:Executor,无法主动关闭,根据创建线程池的方式的不同而采取不同的关闭策略
方式2:ExecutorService.shutdown(),不再接收新的任务,在当前已经提交的任务执行完毕后,关闭执行器。
方式3:ExecutorService.shutdownNow(),不再接收新的任务,立刻关闭执行器,如果存在正在运行的任务,则终止这些任务。
实例代码:
/** * 0 Executor 无法主动关闭 根据创建线程池的方式采取不同的关闭策略 * 1 ExecutorService shutdown 当正在运行的线程运行结束后,关闭线程池 * 2 ExecutorService shutdownNow 理解关闭线程池,正在运行的线程将被停止 */ int type = 2; switch (type) { case 0: //Executor 无法主动关闭 根据创建线程池的方式采取不同的关闭策略 Executor executor = Executors.newCachedThreadPool(); //Executor只有一个方法execute():提交Runnable任务 executor.execute(() -> { System.out.println(Thread.currentThread().getName() + "--- Executor begin ..."); try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "--- Executor end ."); } catch (InterruptedException e) { //e.printStackTrace(); System.out.println(Thread.currentThread().getName() + "--- Executor is interrupted ."); } }); break; case 1: //ExecutorService shutdown 当正在运行的线程运行结束后,关闭线程池 ExecutorService executorService = Executors.newCachedThreadPool(); //ExecutorService通过submit提交任务 executorService.submit(() -> { System.out.println(Thread.currentThread().getName() + "--- ExecutorService begin ..."); try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "--- ExecutorService end ."); } catch (InterruptedException e) { //e.printStackTrace(); System.out.println(Thread.currentThread().getName() + "--- ExecutorService is interrupted ."); } }); executorService.shutdown(); break; case 2: //ExecutorService shutdownNow 理解关闭线程池,正在运行的线程将被停止 ExecutorService executorService1 = Executors.newCachedThreadPool(); //ExecutorService通过submit提交任务 executorService1.submit(() -> { System.out.println(Thread.currentThread().getName() + "--- ExecutorService begin ..."); try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "--- ExecutorService end ."); } catch (InterruptedException e) { //e.printStackTrace(); System.out.println(Thread.currentThread().getName() + "--- ExecutorService is interrupted ."); } }); executorService1.shutdownNow(); break; default: break; } //其他关闭相关的方法 Thread.sleep(1000); System.out.println(); //关于线程池关闭的其他方法 ExecutorService executorService = Executors.newCachedThreadPool(); executorService.submit(()->{System.out.println();}); //executorService.isShutdown():线程池服务是否已经关闭 executorService.isShutdown(); //executorService.isTerminated():在线程池服务执行shutdown()或者shutdownNow()方法之后,所有的任务是否已经完成 //如果没有执行shutdown()或者shutdownNow()方法,则永远返回false executorService.isTerminated(); //阻塞等待所有的任务终止 //如果等待时间超时,则返回false //如果当前线程被interrupt,则抛出InterruptedException异常 //如果线程池了执行shutdown()或者shutdownNow()方法,并且所有的任务都已经完成,则返回true //如果线程池未执行shutdown()或者shutdownNow()方法,则永远返回false executorService.awaitTermination(1000, TimeUnit.MILLISECONDS);
方式1(Executor)运行结果:
pool-1-thread-1--- Executor begin ... pool-1-thread-1--- Executor end .
执行器会在线程运行完之后,再等待60s之后再关闭。
方式2(shutdown)运行结果:
pool-1-thread-1--- ExecutorService begin ... pool-1-thread-1--- ExecutorService end .
执行器会在线程运行完之后再关闭。
方式3(shutdownNow)运行结果:
pool-1-thread-1--- ExecutorService begin ... pool-1-thread-1--- ExecutorService is interrupted .
执行器会在线程运行过程中关闭。
4.实例练习二
关于单任务执行,在前面的不少章节中(例如:Java并发03)都已经涉及,不再赘述。
练习目的:了解ExecutorService的两种批量任务执行方式:
invokeAny:最先完成任务。
批量执行任务,当任意一个任务最先完成时,返回这个任务的执行结果。
返回值是一个执行结果,类型为泛型的实际类型(如任务类型为Callable<Integer>,则返回值为Integer类型)。
invokeAll:全部完成任务。
批量执行任务,当所有的任务都完成之后,返回这些任务的Future对象。
返回值是一个Future集合。
练习内容:
定义一个任务(Callable)集合,其中的每个任务实现随机取数。
定义一个结果(Future)集合,与任务(Callable)集合相对应。
实现1:批量执行任务集合,当所有的任务执行完毕之后,输出所有的执行结果。
实现2:批量执行任务集合,输出第一个执行完毕的任务的执行结果。
实例代码:
//关于Callable和Future的简单用法,前面已经学习 //下面进行Callable和Future的进阶用法--批量任务:最先完成任务和全部完成任务 List<Callable<Integer>> callableList = new ArrayList<>(); for (int i = 0; i < 5; i++) { callableList.add(new Callable<Integer>() { @Override public Integer call() throws Exception { Integer result = RandomUtils.nextInt(100,2000); Thread.sleep(result); System.out.println("init " + result); return result; } }); } //全部任务完成 List<Future<Integer>> futureList = executorService.invokeAll(callableList); System.out.println("等待所有的任务完成之后,才会得到结果集。"); futureList.forEach(future -> { try { System.out.println("result " + future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); Thread.sleep(1000); System.out.println(); //最先完成任务 Integer firstResult = executorService.invokeAny(callableList); System.out.println("得到一个最先得到的结果,立即返回;后面的任务不再运行。"); System.out.println("result " + firstResult);
运行结果:
init 664 init 1026 init 1064 init 1251 init 1494 等待所有的任务完成之后,才会得到结果集。 result 1494 result 1026 result 1251 result 1064 result 664 init 181 得到一个最先得到的结果,立即返回;后面的任务不再运行。 result 181
注意:invokeAny会导致未执行完成的任务不再执行。