java.uitl.concurrent.ThreadPoolExecutor 类是 Executor 框架中最核心的类。
线程池简介
什么是线程池
线程池就是创建若干个可执行的线程放入一个池(容器)中,有任务需要处理时,会提交到线程池中的任务队列,处理完之后线程并不会被销毁,而是仍然在线程池中等待下一个任务。
为什么要使用线程池
因为 Java 中创建一个线程,需要调用操作系统内核的 API,操作系统要为线程分配一系列的资源,成本很高,所以线程是一个重量级的对象,应该避免频繁创建和销毁。
使用线程池就能很好地避免频繁创建和销毁,所以有必要引入线程池。使用 线程池的好处 有以下几点:
- 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。
线程池状态和工作线程数量
这本来是两个不同的概念,但是在ThreadPoolExecutor中我们使用一个变量ctl来存储这两个值,这样我们只需要维护这一个变量的并发问题,提高运行效率。
/**
* 记录线程池中Worker工作线程数量和线程池的状态
* int类型是32位,它的高3位,表示线程池的状态,低29位表示Worker的数量
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS 29位,
private static final int COUNT_BITS = Integer.SIZE - 3;
// 表示线程池中创建Worker工作线程数量的最大值。即 0b0001.....1(29位1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
怎么使用一个变量ctl存储两个值呢?
就是利用int变量的高3位来储存线程池状态,用int变量的低29位来储存工作线程数量。
这样就有两个需要注意的地方:
- 工作线程数量最大值不能超过int类型29位的值CAPACITY 即0b0001.....1(29位1)
- 因为线程池状态都是高3位储存的,所以工作线程数量不会影响状态值大小关系。
线程池状态
// 高3位值是111
private static final int RUNNING = -1 << COUNT_BITS;
// 高3位值是000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 高3位值是001
private static final int STOP = 1 << COUNT_BITS;
// 高3位值是010
private static final int TIDYING = 2 << COUNT_BITS;
// 高3位值是011
private static final int TERMINATED = 3 << COUNT_BITS;
线程池状态分析:
① RUNNING:运行状态。接受新任务,并且也能处理阻塞队列中的任务。
② SHUTDOWN:关闭状态。不接受新任务,但可以处理阻塞队列中的任务。
- 在线程池处于 RUNNING 状态时,调用 shutdown 方法会使线程池进入到该状态。
- finalize 方法在执行过程中也会调用 shutdown 方法进入该状态。
③ STOP:停止状态。不接受新任务,也不处理队列中的任务。会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow 方法会使线程池进入到该状态。
④ TIDYING:整理状态。如果所有的任务都已终止了,workerCount (有效线程数) 为 0,线程池进入该状态后会调用 terminated 方法进入 TERMINATED 状态。
⑤ TERMINATED:已终止状态。在 terminated 方法执行完后进入该状态。默认 terminated 方法中什么也没有做。进入 TERMINATED 的条件如下:
- 线程池不是 RUNNING 状态;
- 线程池状态不是 TIDYING 状态或 TERMINATED 状态;
- 如果线程池状态是 SHUTDOWN 并且 workerQueue 为空;
- workerCount 为 0;
- 设置 TIDYING 状态成功。
操作ctl的方法
获取线程池的状态
/**
* 获取线程池的状态。因为线程池的状态是使用高3位储存,所以屏蔽低29位就行了。
* 所以就c与~CAPACITY(0b1110..0)进行&操作,屏蔽低29位的值了。
* 注意:这里是屏蔽低29位的值,而不是右移29位。
*/
private static int runStateOf(int c) { return c & ~CAPACITY; }
获取工作线程数量
/**
* 获取线程池中Worker工作线程的数量,
* 因为只使用低29位保存Worker的数量,只要屏蔽高3位的值就行了
* 所以就c与CAPACITY(0b0001...1)进行&操作,屏蔽高3位的值了。
*/
private static int workerCountOf(int c) { return c & CAPACITY; }
合并ctl的值
/**
* 得到ctl的值。
* 接受两个参数rs和wc。rs表示线程池的状态,wc表示Worker工作线程的数量。
* 对于rs来说我们只需要高3位的值,对于wc来说我们需要低29位的值。
* 所以我们将rs | wc就可以得到ctl的值了。
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
其他方法
// 因为RUNNING状态高三位是111,所以状态值rs与工作线程数量ws相与的结果值c一定是个负数,
// 而其他状态值都是大于等于0的数,所以c是负数,那么表示当前线程处于运行状态。
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 使用CAS函数将ctl值自增
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* 使用CAS函数将ctl值自减
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* 使用CAS函数加循环方法这种乐观锁的方式,解决并发问题。
* 保证使ctl值减一
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
重要成员变量
// 记录线程池中Worker工作线程数量和线程池的状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 任务线程的阻塞队列,因为是阻塞队列,所以它是并发安全的
private final BlockingQueue<Runnable> workQueue;
// 独占锁,用来保证操作成员变量的并发安全问题
private final ReentrantLock mainLock = new ReentrantLock();
// 等待线程池完全终止的条件Condition,
private final Condition termination = mainLock.newCondition();
//----------------- 需要mainLock来保证并发安全-------------------------//
// 线程池中工作线程集合。Worker中持有线程thread变量
private final HashSet<Worker> workers = new HashSet<Worker>();
// 线程池中曾拥有过的最大工作线程个数
private int largestPoolSize;
// 线程池完成过任务的总个数
private long completedTaskCount;
//----------------- 需要mainLock来保证并发安全-------------------------//
// 创建线程的工厂类
private volatile ThreadFactory threadFactory;
// 当任务被拒绝时,用来处理这个被拒绝的任务
private volatile RejectedExecutionHandler handler;
// 工作线程空闲的超时时间keepAliveTime
private volatile long keepAliveTime;
// 是否允许核心池线程超时释放
private volatile boolean allowCoreThreadTimeOut;
// 线程池核心池线程个数
private volatile int corePoolSize;
// 线程池最大的线程个数
private volatile int maximumPoolSize;
成员变量说明:
- mainLock:使用mainLock来保证会发生变化成员变量的并发安全问题。会发生的成员变量有5个:ctl、workQueue、workers、largestPoolSize和completedTaskCount。但是其中ctl和workQueue的类型本身就是多线程安全的,所以不用mainLock锁保护。
- termination:等待线程池完全终止的条件,如果线程池没有完全终止,调用它的awaitNanos方法,让线程等待。当线程池完全终止后,调用它的signalAll方法,唤醒所有等待termination条件的线程。
- workers:记录所有的工作线程Worker
- workQueue:记录所有待执行的任务。使用阻塞队列BlockingQueue,可以在队列为空时,线程等待,队列有值时,唤醒等待的线程。
- largestPoolSize:线程池中曾拥有过的最大工作线程个数
- completedTaskCount:线程池完成过任务的总个数
- threadFactory:创建线程的工厂类
- handler:当任务被拒绝时,用来处理这个被拒绝的任务
- keepAliveTime:工作线程允许空闲的超时时间,一般都是针对超过核心池数量的工作线程。
- allowCoreThreadTimeOut: 是否允许核心池的工作线程超时释放。
- corePoolSize:线程池核心池线程个数。
- maximumPoolSize: 线程池最大的线程个数。
构造方法
ThreadPoolExecutor 有四个构造方法,前三个都是基于第四个实现。第四个构造方法定义如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
参数说明:
1、corePoolSize:核心线程数量。当有新任务通过 execute 方法提交时 ,线程池会执行以下判断:
- 如果运行的线程数少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的;
- 如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当 workQueue 满时才创建新的线程去处理任务;
- 如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建的线程池的大小是固定的。这时如果有新任务提交,若 workQueue 未满,则将请求放入 workQueue 中,等待有空闲的线程去从 workQueue 中取任务并处理;
- 如果运行的线程数量大于等于 maximumPoolSize,这时如果 workQueue 已经满了,则使用 handler 所指定的策略来处理任务;
所以,任务提交时,判断的顺序为 corePoolSize => workQueue => maximumPoolSize。
2、maximumPoolSize:最大线程数量。
- 如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。
值得注意的是:如果使用了*的任务队列这个参数就没什么效果。
3、keepAliveTime:线程保持活动的时间(非核心线程的存活时间)。
当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime。
所以,如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
4、unit:keepAliveTime 的时间单位。有 7 种取值。可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
5、workQueue - 等待执行的任务队列。用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。
(1)ArrayBlockingQueue - 有界阻塞队列。
此队列是基于数组的先进先出队列(FIFO)。
此队列创建时必须指定大小。
(2)LinkedBlockingQueue - *阻塞队列。
此队列是基于链表的先进先出队列(FIFO)。
如果创建时没有指定此队列大小,则默认为 Integer.MAX_VALUE。
吞吐量通常要高于 ArrayBlockingQueue。
使用 LinkedBlockingQueue 意味着: maximumPoolSize 将不起作用,线程池能创建的最大线程数为 corePoolSize,因为任务等待队列是*队列。
Executors.newFixedThreadPool 使用了这个队列。
(3)SynchronousQueue - 不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。
吞吐量通常要高于 LinkedBlockingQueue。
Executors.newCachedThreadPool 使用了这个队列。
(4)PriorityBlockingQueue - 具有优先级的*阻塞队列。
6、threadFactory - 线程工厂。可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
7、handler - 饱和策略。它是 RejectedExecutionHandler 类型的变量。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。线程池支持以下策略:
AbortPolicy - 丢弃任务并抛出异常。这也是默认策略。
DiscardPolicy - 丢弃任务,但不抛出异常。
DiscardOldestPolicy - 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。
CallerRunsPolicy - 只用调用者所在的线程来运行任务。
如果以上策略都不能满足需要,也可以通过实现 RejectedExecutionHandler 接口来定制处理策略。如记录日志或持久化不能处理的任务。
常用方法