[Java并发编程(二)] 线程池 FixedThreadPool、CachedThreadPool、ForkJoinPool?为后台任务选择合适的 Java executors ...
摘要
Java 和其他平台相比最大的优势在于它可以很好的利用资源来进行并行计算。确实,在 JVM 上可以轻而易举地在后台执行一段代码,并在需要使用它的时候消费计算的结果。同时,它也让开发者可以更好的利用现代计算机硬件所带来计算能力。
但是,想让计算正确并不容易,或许对于开发者最大的挑战是编写一个总是能运行正确的程序,而不是我们熟悉的 “在我机器上” 是正确的。
这篇文章会看看 Executor
里提供的不同选择。
正文
Java Executors 详解
简言之,Executor 是一个接口,它旨在将任务的声明与实际计算解耦。
public interface Executor {
void execute(Runnable command);
}
它以 Runnable 实例的形式接受任务。线程会在某个时间点获取任务并执行 Runnable::run
方法。但是,真正有难度的通常是如何选择将要使用的 Executor 实现。在 Executors 类中已经有一些可供使用的默认实现。让我们来看看它们是什么以及何时选择。
总体上说,当选择供后台计算的 Executor 时,通常可以考虑 3 个主要的问题:
- 默认希望有多少个线程并行执行?
- 当所有可用线程都处于忙碌状态时,Executor 会如何处理一个提交的任务?(如:通常它要么使用更多的线程或者要么将任务加入到队列中)
- 是否希望限制任务队列的大小,如果队列满了会怎样?
如果希望显式地控制这些问题的答案,可以使用 JDK 提供的灵活的 API 来创建自己的 ThreadPoolExecutor 。ThreadPoolExecutor 的构造器显式地要求提供问题的答案:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
以下描述了这些参数的含义以及它们是如何回答以上问题的:
- int corePoolSize - 线程池初始启动时线程的数量
- int maximumPoolSize - 当核心线程繁忙时,希望线程池增长的最大值
- long keepAliveTime, TimeUnit 单位 - 如果线程空闲不再工作,是否希望关闭它?线程池需要等待多久才关闭它?
- BlockingQueue workQueue - 任务如果不能立即处理是如何处理的?是否希望限制任务队列的大小?
- RejectedExecutionHandler handler - 如果 Executor 不接受任务会怎样?是否应该抛出异常?调用方是否应该自行处理任务?
下面列出了由工厂方法 Executors 创建的不同 ExecutorServices 的差异。希望对在面临如上问题并做出选择时有帮助。
newFixedThreadPool(int nThread) - n 个线程会同时进行处理,当线程池满后,新的任务会被加入到大小没有限制的队列中。比较适合 CPU 密集型的任务。
newWorkStealingPool(int parallelism) - 会更加所需的并行层次来动态创建和关闭线程。它同样会试图减少任务队列的大小,所以比较适于高负载的环境。同样也比较适用于当执行的任务会创建更多任务,如递归任务。
newSingleThreadExecutor() - 创建一个不可配置的 newFixedThreadPool(1)
,所以一个线程会执行所有的任务。它适用于明确知道可预测性以及任务需要按顺序执行的情况。
newCachedThreadPool() - 不会将任务加入队列。可以将它看成一个最大值为 0 的队列。如果当前线程都处于繁忙状态,它会创建另外一个线程来执行任务。它有时也会重用线程。它适用于防止 DOS 攻击。缓存线程池的问题在于它不知道该合适停止创建线程。设想需要执行大量计算的任务时,如果将任务提交给 Executor ,更多的线程会消耗更多的 CPU,同时每个任务的执行也会花更长时间。这是个多米诺效应,有更多的任务会被记录下。这样越来越多的线程会被创建,而任务的执行会更慢。很难解决这个负反馈环的问题。
所以对于大多数情况, Executors::newFixedThreadPool(int nThreads) 是当我们想要使用线程池时首先考虑的选择对象。对于计算密集型的任务它通常能提供近于最优的吞吐量,对于 IO 密集型的任务也不会使任何问题变得更糟。至少如果在我们使用这些 Executor 遇到问题并进行性能调优时,不会毫无头绪。
ForkJoinPool 与 ManagedBlockers
当然 JVM 上有一个默认选择的 Executor:通用的 ForkJoinPool,它是由 JVM 预设的用来并行处理流以及执行类似 CompletableFuture::supplyAsync 的任务。
听起来很美?预设的,随时随地可用,最先进的线程池。还希望哪些其他的特性?这里有个忠告,如果有件事情听起来太好了,那么一定需要擦亮眼睛。ForkJoinPool 简直太好了,除了它是通用的 common ,(即被整个 JVM 共享),它可以被在同一 JVM 进程内的所有、任何组件使用。
如果不小心让不合适的任务污染了它,可能会让整个 JVM 进程受到影响。所以如果不小心让 common 池中的工作线程阻塞,可能是没有正确地使用它。
让我们来看看如何让它变得更好。ForkJoinPool 设计的初衷是为了解决有些任务会阻塞工作线程的情况,所以它提供了处理这种阻塞的 API 。
让我们欢迎 — ManagedBlocker - 可以用它来给 ForkJoinPool 传递信号扩展它的并行能力,从而补偿潜在可能被阻塞的工作线程。
假设我们有一个 Call 实例,与 Retrofit 2 Call 类似,它包含所有查询所需要的 endpoint 信息,以及如何将结果转换成对象的信息。开始使用 Retrofit 2,尽管这篇文章主要是写 Android ,但总体的概念与在 JVM 上使用 Retrofit 是一样的。它提供了一套很好的 HTTP 请求的 API 。
class WS<E> implements ManagedBlocker {
private final Call<E> call;
volatile E item = null;
public WS(Call<E> call) {
this.call = call;
}
public boolean block() throws InterruptedException {
if (item == null)
item = call.execute().body();
return true;
}
public boolean isReleasable() {
return item != null;
}
public E getItem() { // call after pool.managedBlock completes
return item;
}
}
现在,当我们想要调用 Call::execute 的时候,我们需要保证它是通过 ForkJoinPool::managedBlock 方法进行调用的
WS ws = new WS(call);
ForkJoinPool.managedBlock(ws);
ws.getItem(); // obtain the result
显然,当在 FJP 以外运行的时候它毫无意义,在线程池上运行时才有意义。FJP 会在线程出现阻塞时生成多的工作线程。需要提醒的是,这并不是银弹,相反,很有可能是错误的,因为 ManagedBlocker API 是用来处理可能被阻塞的 synchronizer 对象的。这里我们是在处理一个阻塞网络调用,它可以处理当我们查询 4 个 urls FJP 计算资源被耗尽的情况。
总结
本篇文章我看了 Executors 类提供给我们的选择,以及何时使用各个 Executor 的策略。对于 CPU 密集型的任务,newFixedThreadPool 可以适用大多数场景,除非明确知道另外一个选择更好。但是,对于 IO 密集型的任务,并不简单。可以通过将 IO 调用包装到 ManagedBlocker 里并使用 ForkJoinPool 来增强它内部的并行能力。