1 ThreadPoolExecutor线程池初探
1.1 线程池的创建
- ThreadPoolExecutor
基于Executors创建线程池的方法就不在赘述,本文主要介绍通过ThreadPoolExecutor对象创建线程池
ThreadPoolExecutor的参数共有七个
- corePoolSize – 线程池中的核心线程数, 除非设置allowCoreThreadTimeOut(后续会介绍改方法),否则核心线将会一直保留在线程池中,
- maximumPoolSize – 线程池中的最大线程个数
- keepAliveTime – 超过corePoolSize的线程的最大空闲时间
- unit – keepAliveTime的时间单位
- workQueue – 用来保存提交到线程池的任务
- threadFactory – 线程池工厂
- handler – 当maximumPoolSize都在执行任务,workQueue也被已提交任务填满时,多余任务提交到Pool的拒绝策略
值得注意的是,超过corePoolSize的任务被提交到Pool中时,并不会立即创建线程来执行,而是会先将任务防止的workQueue中,当workQueue被任务填满后,才会创建线程来执行新提交的任务,知道线程数达到maximunPoolSize。
如下代码所示
向线程池中提交6个任务(T1~T6)时,线程池的线程数一直为1,超过coreSize的任务会在workQueue中等待
但当第7个任务提交时,由于coreSize线程都在执行任务,且wokrQueue已经满了,线程池会创建新的线程(不大于最大线程数的情况下)来执行T7任务
public static void main(String[] args){
System.out.println("begin");
ThreadPoolExecutor pool=new ThreadPoolExecutor(1,2,30L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(5),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 6; i++) {
pool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1*1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行当前任务的线程名称:"+Thread.currentThread().getName());
}
});
}
pool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3*1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第七个任务:"+Thread.currentThread().getName());
}
});
System.out.println("当前线程池中运行的线程个数:"+pool.getActiveCount());
pool.shutdown();
System.out.println("end");
}
可以看到上面的代码运行结果如下,最后提交的任务反而优先执行(pool.getActiveCount() 并不一定能获取准确的运行数,这个后续在介绍)
begin
当前线程池中运行的线程个数:2
end
执行当前任务的线程名称:pool-1-thread-1
不在阻塞被执行:pool-1-thread-2
执行当前任务的线程名称:pool-1-thread-1
执行当前任务的线程名称:pool-1-thread-2
执行当前任务的线程名称:pool-1-thread-1
执行当前任务的线程名称:pool-1-thread-2
执行当前任务的线程名称:pool-1-thread-1
从ThreadPoolExecutor#execute也可以看出如下逻辑
int c = ctl.get();
//当前运行worker数如果小于核心线程数
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//pool的状态为RUNNING,则入队
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次检查,并发操作可能pool已经处于非RUNNING状态
if (! isRunning(recheck) && remove(command))
reject(command);
//如果work为0,新增work,防止workQueue中的task一直无法被执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//入队失败,则尝试创建woeker执行,创建失败执行拒绝策略
else if (!addWorker(command, false))
reject(command);
1.2 线程池如何从workQueue获取Task
线程中用来执行任务的其实是ThreadPoolExecutor的一个私有内部类Worker实现了AQS和Runnable接口
它内部通过ThreadFactory持有一个Thread
调用worker的start方法实际执行的是runWorker方法,该方法中会从阻塞队列中不断获取task来执行
同时线程的空闲销毁也是在该处来判断的
Worker#runWorker方法如下所示:
//该标志用来判断work是否被异常退出
boolean completedAbruptly = true;
try {
//当前work的task不为空,或超时时间内从workqueue成功获取任务,则执行对应task
while (task != null || (task = getTask()) != null) {
//暂时省略
}
//从workQueue中获取task失败(超时或pool被关闭)
completedAbruptly = false;
} finally {
//销毁worker
processWorkerExit(w, completedAbruptly);
}
1.2 线程池中线程的销毁
Worker#getTask方法
// 是否允许核心线程池过期或当前worker数量是否大于核心线程池
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//timedOut代表上次获取是否超时
//pool可以动态调整maximumPoolSize大小,因此需要判断是否大于最大线程池数量或 当前线程数量大于核心数量,且上次获取Task超时,获取Task设置的超时时间是keepAliveTime,代表worker空闲时间大于等于keepAliveTime ,此时返回null,runWorker中的循环会结束,执行finally中的方法
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
Worker#processWorkerExit
//如果completedAbruptly为True,代表异常退出,需要workerCount数量减1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
//销毁线程worker
workers.remove(w);
} finally {
mainLock.unlock();
}
//判断pool是否需要关闭
tryTerminate();
int c = ctl.get();
//如果线程池处于RUNNING或SHUTDOWN
if (runStateLessThan(c, STOP)) {
//未被异常退出,可能是获取超时,次数queue中可能还有剩余任务,
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//若worker数量为0,需要创建worker来完成queue中剩余的任务
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//创建woker,保证消费queue中的任务
addWorker(null, false);
}
暂时粗略的介绍了线程池创建的参数,以及提交任务和线程池中线程的销毁策略,还有比较多的细节知识会在后续慢慢补充。