并发编程
Executor线程池原理和源码解析
ThreadPoolExecutor 默认线程池
线程池的创建
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数解释
corePoolSize
线程池中的核心线程数量,当提交一个任务时,就会创建一个新的线程执行当前任务。知道线程数量等于corePoolSize
maximumPoolSize
线程池允许的最大线程数量。如果当前阻塞队列满了,且继续提交任务,线程池会继续创建线程去执行新提交的任务,知道当前线程池的线程数量等于maximumPoolSize
keepAliveTime
线程池非核心线程所允许的最大空闲时间。当前线程池的线程数量大于corePoolSize时,如果没有新的任务提交,非核心线程不会立刻销毁,而是会等待keepAliveTime的时间
unit
keepAliveTime的单位
workQueue
用来保存等待被执行的任务的队列,任务必须实现Runable接口。JDK中提供如下几种阻塞队列
- ArrayBlockingQueue :基于数组结构的有界阻塞队列,按FIFO排序任务;
- LinkedBlockingQueue: 基于列表结构的阻塞队列,按FIFO排序任务,吞吐量高于ArrayBlockingQueue
- SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否正插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue
- priorityBlockingQuene:具有优先级的*阻塞队列;
threadFactory
线程工厂,用来创建线程。Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程
时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
handler
拒绝策略。当阻塞队列满了,线程池的数量等于maximumPoolSize,并且没有空闲的线程时,如果继续提交任务。线程池会就会启动拒绝策略,线程池提供了四种策略:
- AbortPolicy:默认策略,直接抛出异常
- CallerRunsPolicy: 将任务抛给提交任务的线程执行
- DiscardOldestPolicy:新的任务将被执行。丢弃阻塞队列中最先提交的任务
- DiscardPolicy:直接丢弃新提交的任务
上面的4种策略都是ThreadPoolExecutor的内部类。当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
线程池监控
// 线程池已执行与未执行的任务总数
public long getTaskCount()
// 已完成的任务数
public long getCompletedTaskCount()
// 线程池当前线程数
public int getPoolSize()
//线程池中正在执行任务的线程数量
public int getActiveCount()
线程池原理
源码分析
execute 方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// ctl记录着runState和workerCount
int c = ctl.get();
/**
workerCountOf方法取出低29位的值,表示当前线程数;如果当前线程数
小于corePoolSize,则新建一个线程放入线程池中,并把任务由该线程执行
*/
if (workerCountOf(c) < corePoolSize) {
/**
addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;
如果为true,根据corePoolSize来判断;
如果为false,则根据maximumPoolSize来判断
*/
if (addWorker(command, true))
return;
// 添加失败,重新获取ctl的值
c = ctl.get();
}
// 判断当前线程状态,如果当前线程是允许状态并且任务添加到队列成功
if (isRunning(c) && workQueue.offer(command)) {
// 重新获取ctl的值
int recheck = ctl.get();
// 再次判断当前线程状态,如果不是running,移除上一步添加的任务
if (! isRunning(recheck) && remove(command))
// 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
reject(command);
else if (workerCountOf(recheck) == 0)
/**
获取线程池中的有效线程数,如果数量是0,则执行addWork方法
这里传入的参数表示
1.null,表示在线程池中创建一个线程,但不去启动
2.false,将线程池的有限线程数量的上限设置为maximumPoolSize,
添加线程时根据maximumPoolSize来判断;
*/
addWorker(null, false);
}
/**
如果执行到这里,有两种情况:
1. 线程池已经不是RUNNING状态;
2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
如果失败则拒绝该任务
*/
else if (!addWorker(command, false))
reject(command);
}
简单来说,在执行execute()方法时如果状态一直是RUNNING时,的执行过程如下:
- 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
- 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
这里要注意一下addWorker(null, false);,也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。所以,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。
execute方法执行流程如下:
addWorker方法
addWorker方法的主要工作是在线程池中创建一个新的线程并执行,firstTask参数用于指定新增的线程执行的第一个任务,core参数为true 表示在新增线程时会判断当前活动线程数是否少于corePoolSize。false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize。代码如下
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取线程池状态
int rs = runStateOf(c);
/* 如果rs>=shutdown,则表示此时不再接收新任务
接着判断以下3个条件,只要一个不满足,就返回false
1.rs==SHUDOWN ,这时表示关闭状态,不在接收新任务提交,阻塞队列中已经保存的任务可以继续执行
2.当前提交的任务为空
3.阻塞队列不为空
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
/** wc表示当前线程数
wc >= CAPACITY,也就是ctl的低29位的最大值(二进制是29个一),返回false
这里的core是addWorker方法的第二个参数,如果是true,则用corePoolSize来进行比较
false则用maximumPoolSize来进行比较
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试增加workerCount,如果成功,则跳出第一个for循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果当前的允许状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根据firstTask来创建worker对象
w = new Worker(firstTask);
// 每一个firstTask对象都会创建一个线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是RUNNING状态
//如果is是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程
// 因为在SHUTDOWN时不会添加新的任务,但是还会执行workQueue在的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一个hashSet
workers.add(w);
int s = workers.size();
// largestPoolSize记录着线程池中出现过的最大线程数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
线程池中每一个线程都被封装成一个worker对象,ThreadPool维护的其实就是一组Worker对象。
Worker类继承了AQS,并实现了Runnable接口。注意其中的firstTask和Thread属性:
- firstTask用来保存传入的任务;
- Thread是在调用构造方法时通过ThreadFactory来创建的线程。是用来处理任务的线程
在调用构造方法时,需要把任务传入,这里通过getThreadFactory().newThread(this)来新建一个线程。newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程。所以一个Worker对象在启动的时候会调用Worker类中的run方法
Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReenTrantLock来实现呢?可以看到tryAcquire方法,他是不允许重入的,而ReentrantLock是允许重入的:
- lock方法一旦获取了独占锁,表示当前线程正在执行任务中
- 如果正在执行任务,则不应该中断线程;
- 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有任务处理,这时可以对该线程进行中断
- 线程池在执行shutdown方法或tryTerminate方法时,会调用interruptldWorkers方法来中断空闲的线程,interruptldWorkers方法会使用tryLocj方法来判断线程池是否时空闲状态
- 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReenTrantLock,它是可重入的,这样如果在任务调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程
所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。
此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢?
是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时
就不应该被中断,看一下tryAquire方法:
protected boolean tryAcquire(int unused) {
//cas修改state,不可重入
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
tryAcquire方法是根据state是否是0来判断的,所以,setState(-1);将state设置为-1是为了禁止在执行任务前对线程进行中断。正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为0
runWorker方法
在worker类中的run方法调用了runWorker方法来执行任务,runWoker方法的代码如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
// 是否因为异常退出循环
boolean completedAbruptly = true;
try {
// 如果task为空,则通过getTask来获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
//如果线程池正在停止,那么要保证当前线程是中断状态;
// 如果不是的话,则要保证当前线程不是中断状态;
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker方法的执行过程
- while循环不断的通过getTask()方法来获取任务;
- getTask()方法从阻塞队列中获取任务;
- 如果线程正在停止,那么要保证当前线程是中断状态,否则要保证当当前线程不是中断状态
- 调用task.run()执行实际的任务逻辑
- 如果task为null则跳出循环,执行processWorkerExit()方法
- runWoker方法执行完毕,也代表着Worker中的run方法执行完毕,线程销毁
这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现
completedAbruptly变量来表示在执行任务过程中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断
getTask方法
getTask方法是用来从阻塞队列中获取任务,代码如下
private Runnable getTask() {
// timeOut变量的值表示上次从阻塞队列中取任务时是否超时
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/**
如果线程池状态rs>=SHUTDOWN,也就是非RUNNING状态,再进行一下判断:
1. rs >=SOP,线程池是否正在STOP
2. 阻塞队列是否为空
如果以上条件都满足,则将workerCount减1并返回null
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
/**
timed变量用于判断是否需要进行超时控制。
allowCoreThreadTimeOut 默认为false,也就是核心线程不允许进行超时
wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量,对于超过核心线程数量的这些
线程,需要进行超时控制
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时的执行了setMaximumPoolSize方法;
timed和timeOut如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1
如果减1失败,则返回重试
如果wc == 1 时,也就是说明当前线程是线程池中唯一的一个线程了
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//根据timed来判断,如果为true,则通过阻塞队列的pol方法进行超时控制,
//如果keepAliveTime时间内没有获取到任务。则返回null;
// 否则通过take方法,如果这时队列为空,则take方法会阻塞知道队列不为空
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 如果r == null 说明已经超时,timedOut设置为true
timedOut = true;
} catch (InterruptedException retry) {
// 如果获取任务时当前线程发生了中断,则设置timeOut为false并返回,循环重试
timedOut = false;
}
}
}
这里重要的地方是第二个if判断,目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数在corePoolSize即可。
什么时候会销毁?当然是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。getTask 方法返回null 时, 在runWorker 方法中会跳出while 循环, 然后会执行processWorkerExit方法
processWokerExit方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly为true,则说明线程之执行出现了异常,需要将workerCount减1
// 如果没有异常,说明getTask()方法已经对workerCount进行了减1的操作,这里就没必要再减了
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 统计完成的任务数
completedTaskCount += w.completedTasks;
// 从workers中移除,也就表示着从线程池中移除了一个工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 根据线程池状态进行判断是否结束线程池
tryTerminate();
int c = ctl.get();
/*
当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;
如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
至此,processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:
这个小节我们讲解了ThreadPoolExecutor 的创建方式以及通过解读源码的方式了解了ThreadPoolExecutor 的工作原理,下一小节我们将讲解Executor的另一个实现类ScheduledThreadPoolExecutor 定时线程池