线程池
线程池
- 线程池的重要性
- 重用线程,避免线程创建与销毁带来的性能开销。
- 控制线程池的最大并发数,避免大量线程之间因为互相抢占系统资源而导致的阻塞现象。
- 能够对线程进行简单管理,并且提供定时执行以及制定间隔循环执行等功能。
-
线程池的好处
- 加快响应速度,消除了线程创建带来的延迟
- 合理利用CPU和内存,达到一个资源占用与效率之间的平衡。
- 统一管理
-
线程池的适用场合
- 服务器接收到大量请求时,使用线程池技术是非常合适的,可以大大减少线程的创建和销毁次数,提高服务器的工作效率。
- 实际开发中,如果需要创建5个以上的线程,那么就可以使用线程池来管理。
创建与停止线程池
-
线程池构造函数主要参数
int corePoolSize:核心线程数,线程池在完成初始化以后,默认情况下,线程池中没有线程,线程池会等待有任务到来时,再创建新的线程去完成任务。
int maximumPoolSize:最大线程数,线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数会有一个上限,这就是最大量maximumPoolSize。
long keepAliveTime:存活时间,如果线程池当前的线程数多于corePoolSize,那么多于的线程空闲时间超过keepAliveTime,他们就会被终止。这种机制可以在线程池占用资源的过程中减少冗余消耗。默认情况下是只有多于corePoolSize的线程会被回收,除非是修改了allowCoreThreadTimeOut属性设置true,那么keepAliveTime也会同样作用于核心线程。
BlockingQueue
workQueue:任务存储队列,3中常见的队列类型,(1)直接交接:SynchronousQueue,(2)*队列:LinkedBlockingQueue,(3)有界的队列:ArrayBlockingQueue。 ThreadFactory threadFactory:当线程池需要新的线程时,会使用线程工厂来生成新的线程。默认使用 Executor.defaultThreadFacdtory(),创建出来的线程都在统一个线程组,拥有同样的NORM_PRIORITY优先级并且都不是守护线程。如果自己指定ThreadFactory,那么就可以改变线程名、线程组、优先级、是否守护线程等等,只是一般用默认的就足够了。
RejectedExecutionHandler handler:由于线程池无法接受提交的任务而拒绝策略
所以线程的添加规则是:
- 如果线程数小于corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来执行新任务。
- 如果线程数等于或者大于corePoolSize但是少于maximumPoolSize,就将任务放入队列workQueue。
- 如果队列已满,并且线程数小于maximumPoolSize,就创建一个新的线程来执行任务。
- 如果队列已经满了,并且线程数大于或等于maximumPoolSize,就拒绝这个任务。
从而得出线程池中增减线程的特点:
- 通过设置corePoolSize和maximumPoolSize相同,就可以创建固定大小的线程池。
- 线程池希望保持较少数的线程数,并且只有在复杂变得很大时,才增加它。
- 通过设置maximumPoolSize为很高的值,例如Integer.MAX_VALUE,可以允许线程池容纳任意数量的并发任务。
- 是只有在队列填满时才创建多余corePoolSize的线程,所以如果使用的是*队列(例如LinkedBlockingQueue),那么线程数就不会超过corePoolSize。
-
手动创建与自动创建
手动创建更好,因为可以更加明确线程池的运行规则,避免资源耗尽的风险。相反,自动创建线程池,直接调用JDK封装好的构造函数,可能会带来一些问题。
-
newFixedThreadPool
由于传进去的LinkedBlockingQueue是没有容量上限的,所以当传入的请求越来越多,并且无法及时处理完毕的时候,就是请求堆积的时候,会容易造成占用大量的内存,可能会导致OOM。
-
newSingleThreadPool
单线程的线程池,它只会用唯一的线程来执行任务。这个和FixedThreadPool的原理基本相同,只是把核心线程数和最大线程数都设置为了1,所以也会导致同样的问题,也就是请求堆积的时候,可能会占用大量的内存。
-
CachedThreadPool
可缓存线程;*线程池,具有自动回收多余线程的功能。它的弊端在于第二个参数maximumPoolSize被设置为Integer.MAX_VALUE,这可能会创建数量非常多的线程,甚至直接导致OOM。
-
ScheduleThreadPool
它支持定时以及周期性任务执行的线程池。
所以线程池的创建,最好的方式应该是手动创建,根据不同的业务场景,自己设置线程池参数,比如我们的内存有多大,想给线程取什么名字等等。
-
-
线程池中线程数量的合适设定
CPU密集型(加密、计算hash等):最佳线程数为CPU核心数的1-2倍左右。
-
耗时IO型(读写数据库、文件、网络读写等等):最佳线程数一般会大于CPU核心数很多被,以JVM线程监控显示频繁情况为依据,保证线程空闲可以衔接上,参考BrainGoetz推荐的计算方法:
线程数=CPU核心数*(1+平均等待时间/平均工作时间)
-
停止线程池的正确方法
- shutdown:不会马上停止,已经提交的任务会执行完,新的任务无法再被提交了。
- isShutdown:返回是否shutdow,即是线程池中的任务还在执行。
- isTerminated:它可返回整个线程池是不是已经完全停止了。
- awaitTermination:测试一段时间内线程池是不是会完全停止,起到的作用是检测。这个方法在返回之前是阻塞的,只有当线程池中所有任务都执行完毕,或者等待时间到了,以及等待过程中被中断了抛出异常。
- shutdownNow:马上停止线程池,用interrupt信号去通知正在执行的线程停止,而队列当中等待的任务则直接返回runableList,可以用于再以后通过其他方式执行。
常见线程池的特点和用法
- newFixedThreadPool
- newSingleThreadPool
- CachedThreadPool
- ScheduleThreadPool
- 以及JDK1.8加入的WorkingStealingPool:这个线程池和其他的有很大的不同。加入的任务不是普通任务,而且是有子任务的情况下会适合这个场景,比如说二叉树遍历、矩阵的处理。使用这个线程池有一定的窃取能力,每一个线程之间是会合作的。(使用场景有限)
FixedThreadPool与SingleThreadPool的Queue是LinkedBlockingQueue?
他们的线程数量已经固定了,添加的任务数量无法估计,所以是用了一个可以存储无限多任务的队列来帮助存储任务。
CachedThreadPool使用的Queue是SynchronousQueue?
SynchronousQueue内部实际上是不存储的,但是在CachedThreadPool这个线程池的情况下也根本不需要队列来存储,会直接启动新的线程,效率更高些,也不需要一个队列去中转了。
ScheduleThreadPool使用的是延迟队列DelayedWorkQueue。
拒绝任务
- 拒绝时机
- 当Executor关闭时,提交新任务会被决绝
- 以及当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时。
- 4种拒绝策略
- AbortPolicy:直接抛出一个异常
- DiscardPolicy:默默地把任务丢弃
- DiscardOldestPolicy:丢弃队列中最老的任务,以便腾出空间存储添加的新任务。
- CallerRunsPolicy:让提交任务的这个线程去执行任务。这种方式避免了任务损失,也是一种负反馈策略,给了线程池缓冲的时间。
钩子方法
- 在有特殊用途或者需要在每个任务执行前后进行一些日志或者统计,等等。
线程池实现原理
线程池、ThreadPoolExecutor、ExecutorService、Executor、Executors等,这么多和线程池相关的类,大家都是什么关系?
首先它们的继承实现关系依次是这样的:Executor <-ExecutorService <- AbstractExecutorService <- ThreadPoolExecutor
线程池实现任务复用的原理:
相同线程执行不同任务。源码的分析如下:
executorService.execute(new Task());
public interface Executor {
void execute(Runnable command);
}
class ThreadPoolExecutor {
//其它省略...
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//ctl记录了线程池状态和线程数
int c = ctl.get();
//首先检查当前线程熟练是不是小于核心线程数量
if (workerCountOf(c) < corePoolSize) {
//如果不够,就创建新的线程,command就是新的任务
if (addWorker(command, true))
return;
c = ctl.get();
}
//检查是不是running状态,如果是就继续放到工作队列中
if (isRunning(c) && workQueue.offer(command)) {
//由于在此期间线程可能会被终止了,所以再进行一次判断
int recheck = ctl.get();
//如果现在不是正在运行,就将这个任务删除,并且使用拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//如果线程已经减少至0,就需要创建新的线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//能够执行到这里,说明线程已经停止,
//或者队列已经放不进去了线程数也大于核心线程数了
//就要增加线程,直到达到最大线程数量。
else if (!addWorker(command, false))
reject(command);//如果已经不能再增加了,就拒绝
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//这个Worker,就是主要的工作相关的类
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
//这个方法反应了线程的复用
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//获取到Runable类型的任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//只要这个任务不为空,就去执行,getTask()就是从阻塞队列中去取出任务
//while循环意味着整个worker不会停止,不停的执行任务,取出新的任务执行
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
//执行Runable的run方法
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
}
- 线程池管理器
- 工作线程
- 任务队列
- 任务接口(Task)
线程池的五种状态:
- RUNNING:接受新任务并且处理排队任务
- SHUTDOWN:不接受新任务,但处理排队任务
- STOP:不接受新任务,也不处理排队任务
- TIDYING:中文是整洁,理解了中文就容易理解这个状态,所有任务都已经终止,workerCount为零时,线程会转换到TIDYING状态,并且将运行terminate()钩子方法。
- TERMINATED:terminate()方法运行完成。
使用线程池的注意点
- 避免任务的堆积
- 避免线程数过度增加
- 排查线程泄露:线程执行完毕却不能回收,往往是任务逻辑有问题,导致线程结束不了。