一、Executors创建线程池的工厂方法
Executors是线程池的工具类,它提供了四种创建线程池的工厂方法:
- newFixedThreadPool:该方法返回一个固定数量的线程池,线程数不变,当有一个任务提交时,若线程池中空闲,则立即执行,若没有,则会被暂缓在一个任务队列中,等待有空闲的线程去执行;
- newSingleThreadExecutor: 创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中;
- newCachedThreadPool:返回一个可根据实际情况调整线程个数的线程池,不限制最大线程数量,若用空闲的线程则执行任务,若无任务则不创建线程。并且每一个空闲线程会在60
秒后自动回收;- newScheduledThreadPool:创建一个可以指定线程的数量的线程池,但是这个线程池还带有延迟和周期性执行任务的功能,类似定时器。
上面4种有些是因为*队列容易引发OOM,有些是因为没有最大线程数的限制资料浪费,这些都有风险,所以阿里建议使用ThreadPoolExecutor的方式手动创建线程池。
二、ThreadPoolExecutor构造参数
- corePoolSize:线程池核心线程个数;
- maximunPoolSize:线程池最大线程数量。
- keeyAliveTime:空闲线程的最大存活时间。注意这里说的是非核心线程!
- TimeUnit,存活时间的时间单位。
- workQueue:用于保存等待执行的任务的阻塞队列。比如基于数组的有界 ArrayBlockingQueue,基于链表的*LinkedBlockingQueue,最多只有一个元素的同步队列 SynchronousQueue,优先队列PriorityBlockingQueue 等。
- ThreadFactory:创建线程的工厂。可以自定义个线程工厂指定线程池的名称。
- RejectedExecutionHandler:饱和策略,当队列满了并且线程个数达到 maximunPoolSize 后采取的策略,比如AbortPolicy(抛出异常),CallerRunsPolicy(使用调用者所在线程来运行任务),DiscardOldestPolicy(调用poll丢弃一个任务,执行当前任务),DiscardPolicy(默默丢弃,不抛出异常)。
三、ThreadPoolExecutor执行流程图
四、ThreadPoolExecutor执行源码
1、线程池的状态
ThreadPoolExecutor 成员变量 ctl 是个 Integer 的原子变量用来记录线程池状态 和 线程池中线程个数,则其中高 3 位用来表示线程池状态,后面 29 位用来记录线程池线程个数。
//用来标记线程池状态(高3位),线程个数(低29位)
//默认是RUNNING状态,线程个数为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程个数掩码位数,并不是所有平台int类型是32位,所以准确说是具体平台下Integer的二进制位数-3后的剩余位数才是线程的个数,
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程最大个数(低29位)00011111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
线程池状态:
//(高3位):11100000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
//(高3位):00000000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//(高3位):00100000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
//(高3位):01000000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
//(高3位):01100000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
线程池状态 和 线程池中线程 个数的查询和操作,后面线程池执行的时候会用到。
// 通过二进制的与运算获取高三位 运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 通过二进制的与运算获取低29位 线程个数
private static int workerCountOf(int c) { return c & CAPACITY; }
//计算ctl新值,线程状态 与 线程个数
private static int ctlOf(int rs, int wc) { return rs | wc; }
2、ThreadPoolExecutor#execute
public void execute(Runnable command) {
//(1) 如果任务为null,则抛出NPE异常
if (command == null)
throw new NullPointerException();
//(2)获取当前线程池的状态+线程个数变量的组合值
int c = ctl.get();
//(3)当前线程池线程个数是否小于corePoolSize,小于则开启新线程运行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//(4)如果线程池处于RUNNING状态,则添加任务到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
//(4.1)二次检查
int recheck = ctl.get();
//(4.2)如果当前线程池状态不是RUNNING则从队列删除任务,并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//(4.3)否者如果当前线程池线程空,则添加一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//(5)如果队列满了,则新增线程,新增失败则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
接下来看新增线程的 addWorkder 方法的源码,如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//(6) 若线程池状态为 STOP,TIDYING,TERMINATED,或者队列没有任务时则不添加worker!
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//(7)循环cas增加线程个数
for (;;) {
int wc = workerCountOf(c);
//(7.1)如果线程个数超限则返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//(7.2)cas增加线程个数,同时只有一个线程成功
if (compareAndIncrementWorkerCount(c))
break retry;
//(7.3)cas失败了,则看线程池状态是否变化了,变化则跳到外层循环重试重新获取线程池状态,否者内层循环重新cas。
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
//(8)cas增加线程个数,接下来则创建worker
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//(8.1)创建worker
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//(8.2)加独占锁,为了workers同步,因为可能多个线程调用了线程池的execute方法。
mainLock.lock();
try {
//(8.3)重新检查线程池状态,为了避免在获取锁前调用了shutdown接口
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//(8.4)添加任务
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//(8.5)添加成功则启动任务
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// (8.6)如果worker启动失败,则从HashSet剔除当前worker,并递减实际工作线程数。
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
如上代码主要分两部分,第一部分的双重循环目的是通过 cas 操作增加线程池线程数,第二部分主要是并发安全的把任务添加到 workers 里面,并且启动任务执行。
接着我们再看看runWorker方法,代码如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); //(9)status设置为0,允许中断
boolean completedAbruptly = true;
try {
//(10)
while (task != null || (task = getTask()) != null) {
//(10.1)
w.lock();
...
try {
//(10.2)任务执行前干一些事情
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();//(10.3)执行任务(执行具体的任务)
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//(10.4)任务执行完毕后干一些事情
afterExecute(task, thrown);
}
} finally {
task = null;
//(10.5)统计当前worker完成了多少个任务
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//(11)执行清工作
processWorkerExit(w, completedAbruptly);
}
}
上面在执行具体任务期间加锁,是为了避免任务运行期间,其他线程调用了 shutdown 或者 shutdownNow 命令关闭了线程池。
3、Worker
先看下Worker的继承关系:
由上图可知,Worker继承了AQS和Runnable接口,而Worker重写或复用了AQS的锁相关的方法。
再看下Worker类里2个成员变量:
final Thread thread;
thread属性是Worker维护的线程,每个Worker对象一个,这个就是用来执行任务用的线程,也就是说,Worker对象的数量也就代表了线程池中活动线程的数量。
Runnable firstTask;
Worker对象的初始任务,多数情况下每个Worker对象创建时都伴随一个初始任务,是这个Worker对象优先会执行的任务,当执行完firstTask后,Worker对象还会从线程池中继续获取任务并执行。
由Worker里的两个成员变量可知,Worker是对线程和任务的封装,task任务就是交由Worker里的thread来执行的!