任何一种语言、框架,线程都是非常重要的一部分。要想实现异步就需要通过异步线程,但是频繁地创建销毁线程会带来较大的性能开销,而线程池就是为解决这一问题而出现的。简单来说线程池有以下几大优势:
- 降低资源开销:通过复用已经创建的线程,降低线程频繁创建、销毁带来的资源开销和性能损耗
- 快速启动任务:通过复用已有线程,快速启动任务
- 易于管理:线程池可以统一管理、分配、调优和监控
Java中的线程池是基于ThreadPoolExecutor
实现的,我们使用的ExecutorService
的各种线程池策略都是基于ThreadPoolExecutor
实现的,所以ThreadPoolExecutor
十分重要。要弄明白各种线程池策略,必须先弄明白ThreadPoolExecutor
。
1 创建线程池
首先来看下线程池的创建:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize 核心线程池大小
- maximumPoolSize 线程池最大容量大小
- keepAliveTime 线程池空闲时,线程存活的时间
- TimeUnit 时间单位
- ThreadFactory 线程工厂
- BlockingQueue任务队列
- RejectedExecutionHandler 线程拒绝策略
ThreadPoolExecutor的基本流程如下:
- 当用户通过submit或者execute提交任务时,如果当前线程池中线程数小于
corePoolSize
,直接创建一个线程执行任务 - 如果当前线程数大于
corePoolSize
,则将任务加入到BlockingQueue
中 - 如果
BlockingQueue
也满了,在小于MaxPoolSize
的情况下创建线程执行任务 - 如果线程数大于等于
MaxPoolSize
,那么执行拒绝策略RejectedExecutionHandler
- 当线程池中超过
corePoolSiz
e线程,空闲时间达到keepAliveTime
时,关闭空闲线程
2 线程池状态
ThreadPoolExecutor
内部有多个状态,理解线程池内部状态对于理解线程池原理至关重要,所以接下来看下线程池的状态:
/*
* runState是整个线程池的运行生命周期状态,有如下取值:
* 1. RUNNING:可以新加线程,同时可以处理queue中的线程。
* 2. SHUTDOWN:不增加新线程,但是处理queue中的线程。
* 3. STOP 不增加新线程,同时不处理queue中的线程。
* 4. TIDYING 所有的线程都终止了(queue中),同时workerCount为0,那么此时进入TIDYING
* 5. terminated()方法结束,变为TERMINATED
* The runState provides the main lifecyle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
* 状态的转化主要是:
* RUNNING -> SHUTDOWN(调用shutdown())
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP(调用shutdownNow())
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING(queue和pool均empty)
* When both queue and pool are empty
* STOP -> TIDYING(pool empty,此时queue已经为empty)
* When pool is empty
* TIDYING -> TERMINATED(调用terminated())
* When the terminated() hook method has completed
*
* Threads waiting in awaitTermination() will return when the
* state reaches TERMINATED.
*
* Detecting the transition from SHUTDOWN to TIDYING is less
* straightforward than you'd like because the queue may become
* empty after non-empty and vice versa during SHUTDOWN state, but
* we can only terminate if, after seeing that it is empty, we see
* that workerCount is 0 (which sometimes entails a recheck -- see
* below).
*/
runState的存储也值得一说,它并不是用一个单独的int或者enum进行存储,而是和线程数workerCount共同保存到一个原子量ctl中:
//利用ctl来保证当前线程池的状态和当前的线程的数量。ps:低29位为线程池容量,高3位为线程状态。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//设定偏移量
private static final int COUNT_BITS = Integer.SIZE - 3;
//确定最大的容量2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//几个状态,用Integer的高三位表示
// runState is stored in the high-order bits
//111
private static final int RUNNING = -1 << COUNT_BITS;
//000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//001
private static final int STOP = 1 << COUNT_BITS;
//010
private static final int TIDYING = 2 << COUNT_BITS;
//011
private static final int TERMINATED = 3 << COUNT_BITS;
//获取线程池状态,取前三位
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取当前正在工作的worker,主要是取后面29位
private static int workerCountOf(int c) { return c & CAPACITY; }
//获取ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
- 通过调用
runStateOf()
方法获取当前线程池状态 - 通过调用
workerCountOf()
获取当前线程数
3 添加任务
向线程池添加任务一般通过execute
或者submit
方法添加,接下来通过execute
方法介绍下添加任务的原理:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//当前的Worker的数量小于核心线程池大小时,新建一个Worker线程执行该任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果worker数量已经大于核心线程数,尝试将任务添加到任务队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//recheck防止线程池状态的突变,如果突变,那么将reject线程,防止workQueue中增加新线程
reject(command);
else if (workerCountOf(recheck) == 0)//上下两个操作都有addWorker的操作,但是如果在workQueue.offer的时候Worker变为0,
//那么将没有Worker执行新的task,所以增加一个Worker.
addWorker(null, false);
}
//如果workQueue满了,那么这时候可能还没到线程池的maximum,所以尝试增加一个Worker
else if (!addWorker(command, false))
reject(command);//如果Worker数量到达上限,那么就拒绝此线程
}
可以看到execute
方法内部的核心逻辑在于添加工作线程addWorker
方法,所以接下来看下addWorker
:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/**
* rs!=Shutdown || fistTask!=null || workCount.isEmpty
* 如果当前的线程池的状态>SHUTDOWN 那么拒绝Worker的add
* 如果=SHUTDOWN,那么此时不能新加入不为null的Task,如果在WorkCount为empty的时候不能加入任何 * 类型的Worker,
* 如果不为empty可以加入task为null的Worker,增加消费的Worker
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//如果当前线程数已经超标,直接返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//如果线程数没有超标,则尝试通过CAS将workercount加一,如果成功直接跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
//如果失败,对状态进行double check,如果状态已改变则重试
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//接下来开始真正创建新的线程
//创建一个新的worker线程
Worker w = new Worker(firstTask);
Thread t = w.thread;
//获取锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
/**
* rs!=SHUTDOWN ||firstTask!=null
*
* 同样检测当rs>SHUTDOWN时直接拒绝减小Wc,同时Terminate,如果为SHUTDOWN同时firstTask不为null的时候也要Terminate
*/
if (t == null ||
(rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null))) {
decrementWorkerCount();
tryTerminate();
return false;
}
//将新建的worker线程加入到workers数组中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
} finally {
mainLock.unlock();
}
//新建线程开始执行
t.start();
// It is possible (but unlikely) for a thread to have been
// added to workers, but not yet started, during transition to
// STOP, which could result in a rare missed interrupt,
// because Thread.interrupt is not guaranteed to have any effect
// on a non-yet-started Thread (see Thread#interrupt).
//若此时线程池状态变为STOP,但当前线程并未interrupt,执行interrupt
if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
t.interrupt();
return true;
}
整个addWorker方法大致分为两大阶段:
- workerCount++:此时并不创建真正的线程,而仅仅是通过CAS操作把workerCount加一
- 创建线程:创建worker线程,将其加入到workers队列中,并根据状态对线程进行不同操作
3.1 workerCount++
workerCount
++操作主要涉及上述代码中标号retry
覆盖的代码,主要逻辑有以下三大部分:
-
根据线程池当前状态判断是否可以添加线程,如果不能添加直接返回false:
- 如果当前的线程池的状态>SHUTDOWN 那么拒绝Worker的add
- 如果=SHUTDOWN,那么此时不能新加入不为null的Task,如果在WorkCount为empty的时候不能加入任何类型的Worker
- 如果不为empty可以加入task为null的Worker,增加消费的Worker
-
根据当前worker数判断是否可以添加线程:
- 如果core为true,且当前worker数超过corePoolSize则不允许添加线程
- 如果core为fasle,且worker数超过maximumPoolSize则不允许添加线程
- 通过
compareAndIncrementWorkerCount
执行workerCount++操作,如果成功跳出循环;如果失败对当前状态进行doubleCheck,如果状态改变重新回到步骤1,如果状态不变重新回到步骤2
3.2 创建线程
创建线程的操作主要分为以下几个步骤:
- 创建一个worker线程实例
- 获取当前线程池锁进行互斥操作
- 对线程池状态再次进行判断。同样检测当rs>SHUTDOWN时直接拒绝减小Wc,同时Terminate,如果为SHUTDOWN同时firstTask不为null的时候也要Terminate
- 将线程加入线程队列中,释放锁
- 执行线程
- 若此时线程池状态变为STOP,但当前线程并未interrupt,执行interrupt
4 Worker
在第3节中看到添加的线程是通过Worker实现的,所以接下来看下Worker这个类:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable var2) {
this.setState(-1);
this.firstTask = var2;
this.thread = ThreadPoolExecutor.this.getThreadFactory().newThread(this);
}
......
public void run() {
ThreadPoolExecutor.this.runWorker(this);
}
}
可以看到Worker实现了Runnable接口,并在内部维护了一个线程变量,看到这里其实Worker的大致逻辑明显了,无非是维护一个线程实例,执行添加的runnable实例。
4.1runWorker
在addWorker方法中,Worker实例创建好后会就会执行其thread变量的start方法,进而也就会执行Worker的run方法:
public void run() {
ThreadPoolExecutor.this.runWorker(this);
}
所以接下来看下ThreadPoolExecutor的runWorker方法:
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
//标识线程是不是异常终止的
boolean completedAbruptly = true;
try {
//task不为null情况是初始化worker时,如果task为null,则去队列中取线程--->getTask()
while (task != null || (task = getTask()) != null) {
w.lock();
//获取woker的锁,防止线程被其他线程中断
clearInterruptsForTaskRun();//清楚所有中断标记
try {
beforeExecute(w.thread, task);//线程开始执行之前执行此方法,可以实现Worker未执行退出,本类中未实现
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//线程执行后执行,可以实现标识Worker异常中断的功能,本类中未实现
}
} finally {
task = null;//运行过的task标null,方便GC
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//处理worker退出的逻辑
processWorkerExit(w, completedAbruptly);
}
}
整个方法的逻辑比较简单:
- task不为null情况是初始化worker时,如果task为null,则去队列中取线程--->getTask()
- 获取woker的锁,防止线程被其他线程中断
- 线程开始执行之前执行beforeExecute方法,可以实现Worker未执行退出,本类中未实现
- 执行任务
- 线程执行后执行,可以实现标识Worker异常中断的功能,本类中未实现
- 处理worker退出的逻辑
4.2 getTask
接下来再来看看runWorker中的getTask方法:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//当前状态为>stop时,不处理workQueue中的任务,同时减小worker的数量所以返回null,如果为shutdown 同时workQueue已经empty了,同样减小worker数量并返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
这段代码十分关键,首先看几个局部变量:
boolean timedOut = false;//主要是判断后面的poll是否要超时
boolean timed;//主要是标识着当前Worker超时是否要退出
wc > corePoolSize时需要减小空闲的Worker数,那么timed为true,但是wc <= corePoolSize时,不能减小核心线程数timed为false。
timedOut初始为false,如果timed为true那么使用poll取线程。如果正常返回,那么返回取到的task。如果超时,证明worker空闲,同时worker超过了corePoolSize,需要删除。返回r=null。则 timedOut = true。此时循环到wc <= maximumPoolSize && ! (timedOut && timed)
时,减小worker数,并返回null,导致worker退出。如果线程数<= corePoolSize,那么此时调用 workQueue.take(),没有线程获取到时将一直阻塞,直到获取到线程或者中断。