ThreadPoolExecutor 分析

一、 演示

public class ThreadPoolTest {
static class MyThread implements Runnable {
private String name; public MyThread(String name) {
this.name = name;
} @Override
public void run() { try {
Thread.sleep(1000); System.out.println(name + " finished job!") ;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} public static void main(String[] args) {
// 创建线程池,为了更好的明白运行流程,增加了一些额外的代码
// BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
// BlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>();
// BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>(); // AbortPolicy/CallerRunsPolicy/DiscardOldestPolicy/DiscardPolicy
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 5, TimeUnit.SECONDS,
queue, new ThreadPoolExecutor.AbortPolicy()); for (int i = 0; i < 10; i++) {
System.out.println("当前线程池大小[" + threadPool.getPoolSize() + "],当前队列大小[" + queue.size() + "]"); threadPool.execute(new MyThread("Thread" + i));
}
// 关闭线程池
threadPool.shutdown();
}
}

There are three general strategies for queuing:

  1. Direct handoffs. A good default choice for a work queue is a SynchronousQueue that hands off tasks to threads without otherwise holding them. Here, an attempt to queue a task will fail if no threads are immediately available to run it, so a new thread will be constructed. This policy avoids lockups when handling sets of requests that might have internal dependencies. Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed.
  2. Unbounded queues. Using an unbounded queue (for example a LinkedBlockingQueue without a predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn't have any effect.) This may be appropriate when each task is completely independent of others, so tasks cannot affect each others execution; for example, in a web page server. While this style of queuing can be useful in smoothing out transient bursts of requests, it admits the possibility of unbounded work queue growth when commands continue to arrive on average faster than they can be processed.
  3. Bounded queues. A bounded queue (for example, an ArrayBlockingQueue) helps prevent resource exhaustion when used with finite maximumPoolSizes, but can be more difficult to tune and control. Queue sizes and maximum pool sizes may be traded off for each other: Using large queues and small pools minimizes CPU usage, OS resources, and context-switching overhead, but can lead to artificially low throughput. If tasks frequently block (for example if they are I/O bound), a system may be able to schedule time for more threads than you otherwise allow. Use of small queues generally requires larger pool sizes, which keeps CPUs busier but may encounter unacceptable scheduling overhead, which also decreases throughput.

排队有三种通用策略:

  1. 直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁(A线程需要先运行,然后是B线程,SynchronousQueue可以保证这个执行顺序)。直接提交通常要求* maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许*线程具有增长的可能性。

  2. *队列。当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用*队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许*线程具有增长的可能性。

  3. 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如, I/O密集型 ),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

二、构造函数及其参数含义

1. 当线程数 < 核心线程数时,创建线程

2. 线程数 >= 核心线程数,

2.1 任务队列未满时,将任务放入任务队列。

   2.2 任务队列已满

    2.2.1 线程数 < 最大线程数,创建线程

    2.2.2 线程数 > 最大线程数,抛出异常,拒绝任务

public ThreadPoolExecutor(int corePoolSize,    //核心线程的数量
int maximumPoolSize, //最大线程数量
long keepAliveTime, //超出核心线程数量以外的线程空余存活时间
TimeUnit unit, //存活时间的单位
BlockingQueue<Runnable> workQueue, //保存待执行任务的队列
ThreadFactory threadFactory, //创建新线程使用的工厂
RejectedExecutionHandler handler // 当任务无法执行时的处理器
) {...}

ThreadPoolExecutor 分析

执行判断

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException(); int c = ctl.get();
//1.当前池中线程比核心数少,新建一个线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2.核心池已满,但任务队列未满,添加到队列中,所以:如果是*队列,添加的任务超过核心线程后,不会创建非核心线程,maximumPoolSize参数无效
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) //如果这时被关闭了,拒绝任务
reject(command);
else if (workerCountOf(recheck) == 0) //如果之前的线程已被销毁完,新建一个线程
addWorker(null, false);
}
//3.核心池已满,队列已满,试着创建一个新线程
else if (!addWorker(command, false))
reject(command); //如果创建新线程失败了,说明线程池被关闭或者线程池完全满了,拒绝任务
}

不同类型线程池构造

    public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
    public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
    public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

三、如何确定系统的处理能力、线程数

1. 处理能力

利特尔法则(Little's law),在一个稳定的系统中,长时间观察到的平均顾客数量L = 长时间观察到的有效到达速率λ * 平均每个顾客在系统中花费的时间W,即L = λW。

假定我们所开发的并发服务器,并发的访问速率是:1000客户/分钟,每个客户在该服务器上将花费平均0.5分钟,根据little's law规则,

在任何时刻,服务器将承担1000×0.5=500个客户量的业务处理。假定过了一段时间,由于客户群的增大,并发的访问速率提升为2000客户/分钟。

在这样的情况下,我们该如何改进我们系统的性能?

根据little's law规则,有两种方案:

第一:提高服务器并发处理的业务量,即提高到2000×0.5=1000。 或者

第二:减少服务器平均处理客户请求的时间,即减少到:500 / 2000 = 0.25。

2. 线程数

  • 如果是CPU密集型应用,则线程池大小设置为N+1

  • 如果是IO密集型应用,则线程池大小设置为2N+1

是否使用线程池就一定比使用单线程高效呢?

答案是否定的,比如Redis就是单线程的,但它却非常高效,基本操作都能达到十万量级/s。从线程这个角度来看,部分原因在于:

  • 多线程带来线程上下文切换开销,单线程就没有这种开销

更本质的原因在于:Redis基本都是内存操作,这种情况下单线程可以很高效地利用CPU。而多线程适用场景一般是:存在相当比例的IO和网络操作。

所以即使有上面的简单估算方法,也许看似合理,但实际上也未必合理,都需要结合系统真实情况(比如是IO密集型或者是CPU密集型或者是纯内存操作)

和硬件环境(CPU、内存、硬盘读写速度、网络状况等)来不断尝试达到一个符合实际的合理估算值。

四、Worker

    /**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable

Worker是ThreadPoolExecutor的静态内部类,主要是对Thread对象的包装,一个Worker内部有一个Thread对象。

Worker继承自AQS来实现一个简单互斥锁,每一个任务的执行前和执行后都会分别获取和释放一次锁, 这样做是为了让线程执行任务时屏蔽中断操作。

那么为什么不用ReentrantLock呢?其实是为了避免在任务执行中修改线程池的变量和状态,不能用可重入锁。

参考:

Java线程池机制

上一篇:Jmeter中连接Oracle报错Cannot create PoolableConnectionFactory


下一篇:《Spring Boot官方指南》(一)Spring Boot 文档