接上一篇博客 https://blog.csdn.net/qq_43605444/article/details/121727738?spm=1001.2014.3001.5501
6、Worker 类
下面的是在 Worker 类上的官方的一段注释:
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/
从注释中我们可以了解到 Worker 类是内部类,既实现了Runnable,又继承了AbstractQueuedSynchronizer(以下简称AQS),所以其既是一个可执行的任务,又可以达到锁的效果。从下面的代码也可以看出:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
Worker 类主要维护正在运行任务的线程的中断控制状态,以及其他次要的记录。
这个类继承了AbstractQueuedSynchronizer
类,以简化获取和释放锁(该锁作用于每个任务执行代码)的过程。这样可以防止去中断正在运行中的任务,只会中断在等待从任务队列中获取任务的线程。
我们实现了一个简单的不可重入互斥锁
,而不是使用可重入锁(ReentrantLock),因为我们不希望工作任务在调用setCorePoolSize之类的池控制方法时能够重新获取锁。另外,为了在线程真正开始运行任务之前禁止中断,我们将锁状态初始化为负值,并在启动时清除它(在runWorker中)。
我们来看一下它的部分代码:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;//Worker持有的线程
Runnable firstTask;//初始化的任务,可以为null
}
Worker 这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。
Worker 执行任务的模型如下:
关于 addWorker 方法的介绍在上一篇博客中有介绍,需要的看【https://blog.csdn.net/qq_43605444/article/details/121727738?spm=1001.2014.3001.5501】
我们来看一下 Worker 类的完整的代码:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
// Worker持有的线程
final Thread thread;
/** Initial task to run. Possibly null. */
// 初始化的任务,可以为null
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 设置AQS的同步状态
// - state:锁状态,-1为初始值,0为unlock状态,1为lock状态
setState(-1); // inhibit interrupts until runWorker 在调用runWorker前,禁止中断
this.firstTask = firstTask;
// 线程工厂创建一个线程
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker
*
* 将主运行循环委托给外部 runWorker
*/
public void run() {
runWorker(this); // runWorker()是ThreadPoolExecutor的方法
}
// Lock methods
//
// The value 0 represents the unlocked state. 0代表“没被锁定”状态
// The value 1 represents the locked state. 1代表“锁定”状态
protected boolean isHeldExclusively() {
return getState() != 0;
}
/**
* 尝试获取锁的方法
* - 重写 AQS 的 tryAcquire()
*/
protected boolean tryAcquire(int unused) {
// 判断原值为0,且重置为1,所以state为-1时,锁无法获取。
// 每次都是 0->1 ,保证了锁的不可重入性
if (compareAndSetState(0, 1)) {
// 设置exclusiveOwnerThread=当前线程
// 独占模式
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 尝试释放锁
* - 不是state-1,而是置为0
*/
protected boolean tryRelease(int unused) {
// 清除当前占用的线程
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
/**
* 中断(如果运行)
* shutdownNow时会循环对worker线程执行
* 且不需要获取worker锁,即使在worker运行时也可以中断
*/
void interruptIfStarted() {
Thread t;
// 如果state>=0、t!=null、且t没有被中断
// new Worker()时state==-1,说明不能中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
关于 AQS 的描述看博主的另外一篇文章 【https://blog.csdn.net/qq_43605444/article/details/121705312?spm=1001.2014.3001.5501】
7、runWorker 方法
- 方法说明:可以说,runWorker(Worker w) 是线程池中真正处理任务的方法,前面的execute() 和 addWorker() 都是在为该方法做准备和铺垫。
- 参数说明:
- Worker w:封装的Worker,携带了工作线程的诸多要素,包括 Runnable(待处理任务)、lock(锁)、completedTasks(记录线程池已完成任务数)
- 下面是具体的代码分析:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// new Worker()是state==-1,此处是调用Worker类的tryRelease()方法,将state置为0,而interruptIfStarted()中只有state>=0才允许调用中断
w.unlock(); // allow interrupts
// 线程退出的原因,true是任务导致,false是线程正常退出
boolean completedAbruptly = true;
try {
// 当前任务和从任务队列中获取的任务都为空,方停止循环
while (task != null || (task = getTask()) != null) {
// 上锁可以防止在shutdown()时终止正在运行的worker,而不是应对并发
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
/**
* 判断1:确保只有在线程处于stop状态且wt未中断时,wt才会被设置中断标识
* 条件1:线程池状态>=STOP,即STOP或TERMINATED
* 条件2:一开始判断线程池状态<STOP,接下来检查发现Thread.interrupted()为true,
* 即线程已经被中断,再次检查线程池状态是否>=STOP(以消除该瞬间shutdown方法生效,
* 使线程池处于STOP或TERMINATED),
* 条件1与条件2任意满足一个,且wt不是中断状态,则中断wt,否则进入下一步
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 当前线程调用interrupt()中断
wt.interrupt();
try {
// 执行前(空方法,由子类重写实现)
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行后(空方法,由子类重写实现)
afterExecute(task, thrown);
}
} finally {
task = null;
// 完成任务数+1
w.completedTasks++;
// 释放锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 处理worker的退出
processWorkerExit(w, completedAbruptly);
}
}
runWorker 方法的执行过程是:
- while循环不断地通过getTask()方法获取任务。
- getTask()方法从阻塞队列中取任务。
- 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
- 执行任务。
- 如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。
8、getTask 方法
- 方法说明:由函数调用关系图可知,在ThreadPoolExecutor类的实现中,Runnable getTask() 方法是为 void runWorker(Worker w)方法服务的,它的作用就是在任务队列(workQueue)中获取 task(Runnable)。
private Runnable getTask() {
// 最新一次poll是否超时
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/**
* 条件1:线程池状态SHUTDOWN、STOP、TERMINATED状态
* 条件2:线程池STOP、TERMINATED状态或workQueue为空
* 条件1与条件2同时为true,则workerCount-1,并且返回null
* 注:条件2是考虑到SHUTDOWN状态的线程池不会接受任务,但仍会处理任务
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
/**
* 下列两个条件满足任意一个,则给当前正在尝试获取任务的工作线程设置阻塞时间限制
*(超时会被销毁?不太确定这点),否则线程可以一直保持活跃状态
* 1.allowCoreThreadTimeOut:当前线程是否以keepAliveTime为超时时限等待任务
* 2.当前线程数量已经超越了核心线程数
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 两个条件全部为true,则通过CAS使工作线程数-1,即剔除工作线程
// 条件1:工作线程数大于maximumPoolSize,或(工作线程阻塞时间受限且上次在任务队列拉取任务超时)
// 条件2:wc > 1或任务队列为空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 移除工作线程,成功则返回null,不成功则进入下轮循环
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 执行到这里,说明已经经过前面重重校验,开始真正获取task了
try {
// 如果工作线程阻塞时间受限,则使用poll(),否则使用take()
// poll()设定阻塞时间,而take()无时间限制,直到拿到结果为止
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// r不为空,则返回该Runnable
if (r != null)
return r;
// 没能获取到Runable,则将最近获取任务是否超时设置为true
timedOut = true;
} catch (InterruptedException retry) {
// 响应中断,进入下一次循环前将最近获取任务超时状态置为false
timedOut = false;
}
}
}
9、processWorkerExit 方法
- 方法说明:processWorkerExit(Worker w, boolean completedAbruptly),执行线程退出的方法
- 参数说明:
- Worker w:要结束的工作线程。
- boolean completedAbruptly: 是否突然完成(异常导致),如果工作线程因为用户异常死亡,则completedAbruptly参数为 true。
- 下面让我们看看 processWorkerExit 的源码:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
/**
* 1.工作线程-1操作
* 1)如果completedAbruptly 为true,说明工作线程发生异常,那么将正在工作的线程数量-1
* 2)如果completedAbruptly 为false,说明工作线程无任务可以执行,由getTask()执行worker-1操作
*/
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
// 2.从线程set集合中移除工作线程,该过程需要加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将该worker已完成的任务数追加到线程池已完成的任务数
completedTaskCount += w.completedTasks;
// HashSet<Worker>中移除该worker
workers.remove(w);
} finally {
// 释放锁
mainLock.unlock();
}
// 3.根据线程池状态进行判断是否结束线程池
tryTerminate();
/**
* 4.是否需要增加工作线程
* 线程池状态是running 或 shutdown
* 如果当前线程是突然终止的,addWorker()
* 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
* 故如果调用线程池shutdown(),直到workQueue为空前,
* 线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程
*/
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
10、线程初始化
默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。
在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:
- prestartCoreThread():boolean prestartCoreThread(),初始化一个核心线程
- prestartAllCoreThreads():int prestartAllCoreThreads(),初始化所有核心线程,并返回初始化的线程数
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true); // 注意传进去的参数是null
}
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true)) // 注意传进去的参数是null
++n;
return n;
}
参考文章:
1、https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html 【美团技术团队博客】
大家可以去看一下美团技术团队写的关于线程池的文章,里面有线程池在业务中的实践。