前言
上一篇【Java 线程知识笔记 (三)】我们从源码里面探究了一个线程在线程池中是如何执行的,由于线程池有一个很大的框架,仅仅通过这点东西就说明白线程池,尤其是对于初学者来说更是难以理解。因此这篇将会对源码讲解中遗漏的一些内容,比如线程池的状态,以及线程池中线程是在哪里创建的等等做一个补漏,同时对线程池的基本内容进行一个概念性的阐述。
线程池的优点
上一篇中对线程池和单线程的效率做了一个对比的例子,同时开启10000个线程对一个list进行操作的时候,线程池的效率几乎是创建线程的100倍。之所以会有这样一个大的差距,是因为消耗资源的线程创建和线程运行解耦了,创建与调度交给了线程池,而外部(我们)只要关心如何运行就可以了。线程作为一个稀缺资源,从此不再是一次性的,而是可以复用了。其实可以看出线程池的核心思想就是:把宝贵的资源放到一个池子中,每次使用都从里面获取,用完之后又放回池子供其他人使用。
线程池的框架
线程池的框架很大,下图是Java中整个线程池的类图。Executor
接口是整个线程池的顶层接口,ExecutorService
接口则声明了众多操作线程池的方法, ThreadPoolExecutor
类则是我们最常用的去初始化线程池的类,它和AbstractExecutorService
类一起封装实现了很多常用的方法,比如submit()
、executor()
、shutdown()
, invoke()
等等。
从上面的图中可以看到,我们经常使用的Executors
类并没有出现在线程池这个框架的类图里面,明明是一个和线程池息息相关的类为什么没有呢?因为Executors
类是一个工具类,里面所有的方法都是静态方法,为了实现Java中默认的不同功能的线程池而使用的。老话说的好:工具人不配有姓名,Executors
类就是这么一个工具人。
线程池的创建
当然我们也可以根据自己的需求去实现ThreadPoolExecutor
类,或者做类似Executors
这种工具人出来,但是我们的讲解还是要从Java中默认的内容出发。Executors
类的官方文档上的说明如下,其中包含了很多不同种类的创建线程池方法,并返回各种需要的类型。
public class Executors extends Object 工厂和工具方法Executor ,
ExecutorService , ScheduledExecutorService ,
ThreadFactory和Callable在此包中定义的类。 该类支持以下几种方法:
• 创建并返回一个ExecutorService设置的常用的配置设置的方法。
• 创建并返回一个ScheduledExecutorService的方法, 其中设置了常用的配置设置。
• 创建并返回“包装”ExecutorService的方法,通过使实现特定的方法无法访问来禁用重新配置。
• 创建并返回将新创建的线程设置为已知状态的ThreadFactory的方法。
• 创建并返回一个方法Callable出的其他闭包形式,这样他们就可以在需要的执行方法使用Callable 。
常用的线程池
Executors
里面的方法很多有兴趣可以自己去看【Java API传送门】。多归多,但是常用的也就那么几个,根据创建的方式不同可以分为下面几个大类:
名字 | 返回值 | 说明 |
---|---|---|
newFixedThreadPool(int nThreads) | ExecutorService | 创建一个固定大小的线程池,其中运行的线程共享一个无边界的队列。可以类比为数组,超出数量的任务要在等待队列中等待。 |
newCachedThreadPool() | ExecutorService | 创建一个缓冲的线程池。根据需求在其中创建线程,一旦线程被创建,此线程即可被复用。可以类比为只能增加容量的变长数组,如果超出当前容量则创建一个新的线程。 |
newSingleThreadExecutor() | ExecutorService | 创建一个使用单一工作线程的Executor,其运行时共享无边界队列。单例线程池按照不同的次序完成不同的任务,比如上篇例子中就是用这个进行的list添加,提高线程的重用性,才会有相差100倍的运行效果。 |
newScheduledThreadPool(int corePoolSize) | ScheduledExecutorService | 计划线程池,这种线程池可以用来delay任务,或者执行定时任务。 |
话说Executors是工具人,因为最终都会调用到ThreadPoolExecutor去实现上面不同的的功能,只不过传入的参数不同而已,正是因为传递的参数不同才造成了连接池的表现不同。ThreadPoolExecutor构造方法已经在上篇中详细说过,不多说。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1)); //不用怀疑这里最终也是跳到ThreadPoolExecutor
}
线程池的生命周期
在之前的讲解Thread的博客中说过线程的生命周期,线程池同样也是有生命周期的,而且上篇中的源码部分已经就出现了线程池的状态判断。由于篇幅没有详细介绍,这里就说一下线程池的生命周期。线程池的状态定义在ThreadPoolExecutor里面,下面是官方源码中的注释。
/** 官网定义的状态和说明
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
* 官网定义的状态转换
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
*/
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
状态描述:
状态名称 | 状态名称 | 状态说明 |
---|---|---|
RUNNING | 运行状态 | RUNNING可以接受任务执行队列里的任务。也就是说线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0。 |
SHUTDOWN | 关闭状态 | 处于SHUTDOWN 状态时,线程池不接收新任务,但是可以继续处理已添加到队列的已有任务。 |
STOP | 停止状态 | STOP状态,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。 |
TIDYING | 整理状态 | 当线程池中所有的任务已终止,任务数量为0时,线程池会变为TIDYING状态,并且运行钩子方法terminated()。 |
TERMINATED | 终止状态 | terminated()运行完成,线程池就会处于这种状态,此状态时线程池彻底终止。 |
注:要注意的是terminated()
在ThreadPoolExecutor类中是空的。若用户想在线程池变为TIDYING
时,进行相应的处理,可以通过重载terminated()
方法来实现。
状态转换:线程池的状态之间的转换和线程类似。
COUNT_BITS 与 CAPACITY :
解释完状态的转换,就要解释COUNT_BITS
的作用了。Integer.SIZE
是32表示Integer是32位整型,我们一共只有5个状态,犯得着用Integer.SIZE
去表示吗?首先说COUNT_BITS
是什么:COUNT_BITS
表示线程数量。COUNT_BITS = Integer.SIZE - 3
也就是说最多有29位表示线程数量。那么后面的int RUNNING = -1 << COUNT_BITS
就好理解了:状态用int型表示,左移29位就是状态,一共32位,高3位是状态,剩下29位就是线程数量。用高3位标识-1(111)
、0(000)
、1(001)
、2(010)
、3(011)
,因此线程池默认的最大容量CAPACITY
就是29位能表示的最大值2^30-1= 536870911
。
线程池的安全关闭
一般来说,线程池不会关闭,因为用到线程池的场景会反复的调用线程,因此会维持一定数量的线程等待请求。但是如果某些需求需要手动关闭的的时候,就需要我们手动去维护了,但是安全的关闭线程池也是一个要注意的点。上面我们知道只有处于TERMINATED
才表示真正关闭了线程池,因为为了保证线程池内所有的线程都安全的结束,我们需要等待线程池处于TERMINATED
的状态才可以,比如:
ExecutorService service= Executors.newSingleThreadExecutor();
service.shutdown();
while(service.isTerminated()){
//使用一个循环空转,直到线程池状态处于TERMINATED为止
}
System.out.println("pool is closed");
线程的复用
说到最后还有一个关键点,线程池是如何做到线程复用的。上一篇说到线程池中所有的任务都会被转换成Worker
类对象执行,直接去Worker
进入构造方法。
private boolean addWorker(Runnable firstTask, boolean core) {
/**略**/
Worker w = null;
try {
w = new Worker(firstTask); //直接进入这里
final Thread t = w.thread; //这里的线程也是拿的Worker对象的
if (t != null) {
/**略**/
if (workerAdded) {
t.start(); //启动线程
workerStarted = true;
}
}
}
/**略**/
return workerStarted;
}
进入后发现thread
是通过一个newThread()
方法构建出来的,但是传入的是this
,也就是Worker
这个内部类对象本身,接着往下走会进入默认的线程工厂中。
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask; //task传值
this.thread = getThreadFactory().newThread(this);//创建核心线程
}
这个方法在Executors.DefaultThreadFactory#newThread()
这里。
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
进来以后发现这里还是new
出来一个线程再用,那线程池说好的线程复用呢?其实这里new
出来的Worker
里的线程是线程池的核心线程。因为当线程被拿出来以后,调用了start()
方法启动了这个new
出来的核心线程,所以我们继续去Worker.run()
里面。
public void run() {
runWorker(this);
}
接着往runWorker()
里走。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
/**略**/
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
/**略**/
try {
/**略**/
try {
task.run(); //执行线程逻辑
} catch (RuntimeException x) {
/**略**/
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
可以看到这里task
被拿出来,然后经过while
循环。点睛之处就在这个while
循环的条件里,只要task
不是空的之就行,但是如果task
是空的那么就从getTask()
里面取数据。getTask()
正是从阻塞队列workQueue
中取任务的方法。这样线程池中的线程永远都不会停下来,源源不断地从队列中取出任务执行。那么如果创建一个核心线程数为2,最大线程数为3的线程池执行逻辑就是这样的:
- 任务1进来,开一个线程池核心线程1,直接执行。
- 任务2进来,再开一个核心线程2,直接执行。
- 任务3进来,此时发现核心线程已经都开起了,而且在RUNNING状态,把任务3放进队列。
- 核心线程1执行任务1结束,经过while循环使用getTask()拿出任务3执行。
- 任务n进来,发现核心线程满了,尝试放进队列,发现队列也满了,直接创建一个非核心线程f1,直接执行。
- 任务n+1进来,发现核心线程满了,队列满了,非核心线程满了,报错。
如此核心线程永远不停下来,永远空转从队列中拿任务执行直到线程池被杀掉。而非核心线程是有空转超时时间的,因此到时间就自己结束了。这就是线程池中线程复用的本质。
总结
到此线程池的内容基本就结束了,这篇博客对上一篇源码分析做了一个知识点的补充,以及最终把线程池中的线程复用从源码角度和逻辑上梳理清楚。希望这些内容对各位读者理解线程池有所帮助。