Java 线程知识笔记 (四) Executor与ThreadPool 其二

前言

上一篇【Java 线程知识笔记 (三)】我们从源码里面探究了一个线程在线程池中是如何执行的,由于线程池有一个很大的框架,仅仅通过这点东西就说明白线程池,尤其是对于初学者来说更是难以理解。因此这篇将会对源码讲解中遗漏的一些内容,比如线程池的状态,以及线程池中线程是在哪里创建的等等做一个补漏,同时对线程池的基本内容进行一个概念性的阐述。

线程池的优点

上一篇中对线程池和单线程的效率做了一个对比的例子,同时开启10000个线程对一个list进行操作的时候,线程池的效率几乎是创建线程的100倍。之所以会有这样一个大的差距,是因为消耗资源的线程创建和线程运行解耦了,创建与调度交给了线程池,而外部(我们)只要关心如何运行就可以了。线程作为一个稀缺资源,从此不再是一次性的,而是可以复用了。其实可以看出线程池的核心思想就是:把宝贵的资源放到一个池子中,每次使用都从里面获取,用完之后又放回池子供其他人使用。

线程池的框架

线程池的框架很大,下图是Java中整个线程池的类图。Executor接口是整个线程池的顶层接口,ExecutorService接口则声明了众多操作线程池的方法, ThreadPoolExecutor类则是我们最常用的去初始化线程池的类,它和AbstractExecutorService类一起封装实现了很多常用的方法,比如submit()executor()shutdown(), invoke()等等。

Java 线程知识笔记 (四) Executor与ThreadPool 其二

从上面的图中可以看到,我们经常使用的Executors类并没有出现在线程池这个框架的类图里面,明明是一个和线程池息息相关的类为什么没有呢?因为Executors类是一个工具类,里面所有的方法都是静态方法,为了实现Java中默认的不同功能的线程池而使用的。老话说的好:工具人不配有姓名,Executors类就是这么一个工具人。

线程池的创建

当然我们也可以根据自己的需求去实现ThreadPoolExecutor类,或者做类似Executors这种工具人出来,但是我们的讲解还是要从Java中默认的内容出发。Executors类的官方文档上的说明如下,其中包含了很多不同种类的创建线程池方法,并返回各种需要的类型。

public class Executors extends Object 工厂和工具方法Executor ,
ExecutorService , ScheduledExecutorService ,
ThreadFactory和Callable在此包中定义的类。 该类支持以下几种方法:
• 创建并返回一个ExecutorService设置的常用的配置设置的方法。
• 创建并返回一个ScheduledExecutorService的方法, 其中设置了常用的配置设置。
• 创建并返回“包装”ExecutorService的方法,通过使实现特定的方法无法访问来禁用重新配置。
• 创建并返回将新创建的线程设置为已知状态的ThreadFactory的方法。
• 创建并返回一个方法Callable出的其他闭包形式,这样他们就可以在需要的执行方法使用Callable 。

常用的线程池

Executors里面的方法很多有兴趣可以自己去看【Java API传送门】。多归多,但是常用的也就那么几个,根据创建的方式不同可以分为下面几个大类:

名字 返回值 说明
newFixedThreadPool(int nThreads) ExecutorService 创建一个固定大小的线程池,其中运行的线程共享一个无边界的队列。可以类比为数组,超出数量的任务要在等待队列中等待。
newCachedThreadPool() ExecutorService 创建一个缓冲的线程池。根据需求在其中创建线程,一旦线程被创建,此线程即可被复用。可以类比为只能增加容量的变长数组,如果超出当前容量则创建一个新的线程。
newSingleThreadExecutor() ExecutorService 创建一个使用单一工作线程的Executor,其运行时共享无边界队列。单例线程池按照不同的次序完成不同的任务,比如上篇例子中就是用这个进行的list添加,提高线程的重用性,才会有相差100倍的运行效果。
newScheduledThreadPool(int corePoolSize) ScheduledExecutorService 计划线程池,这种线程池可以用来delay任务,或者执行定时任务。

话说Executors是工具人,因为最终都会调用到ThreadPoolExecutor去实现上面不同的的功能,只不过传入的参数不同而已,正是因为传递的参数不同才造成了连接池的表现不同。ThreadPoolExecutor构造方法已经在上篇中详细说过,不多说。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}


public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1)); //不用怀疑这里最终也是跳到ThreadPoolExecutor
}

线程池的生命周期

在之前的讲解Thread的博客中说过线程的生命周期,线程池同样也是有生命周期的,而且上篇中的源码部分已经就出现了线程池的状态判断。由于篇幅没有详细介绍,这里就说一下线程池的生命周期。线程池的状态定义在ThreadPoolExecutor里面,下面是官方源码中的注释。

/** 官网定义的状态和说明
*   RUNNING:  Accept new tasks and process queued tasks
*   SHUTDOWN: Don't accept new tasks, but process queued tasks
*   STOP:     Don't accept new tasks, don't process queued tasks,
*             and interrupt in-progress tasks
*   TIDYING:  All tasks have terminated, workerCount is zero,
*             the thread transitioning to state TIDYING
*             will run the terminated() hook method
*   TERMINATED: terminated() has completed
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
* 官网定义的状态转换
* RUNNING -> SHUTDOWN
*    On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
*    On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
*    When both queue and pool are empty
* STOP -> TIDYING
*    When pool is empty
* TIDYING -> TERMINATED
*    When the terminated() hook method has completed
*/

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

状态描述

状态名称 状态名称 状态说明
RUNNING 运行状态 RUNNING可以接受任务执行队列里的任务。也就是说线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0。
SHUTDOWN 关闭状态 处于SHUTDOWN 状态时,线程池不接收新任务,但是可以继续处理已添加到队列的已有任务。
STOP 停止状态 STOP状态,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
TIDYING 整理状态 当线程池中所有的任务已终止,任务数量为0时,线程池会变为TIDYING状态,并且运行钩子方法terminated()。
TERMINATED 终止状态 terminated()运行完成,线程池就会处于这种状态,此状态时线程池彻底终止。

注:要注意的是terminated()在ThreadPoolExecutor类中是空的。若用户想在线程池变为TIDYING时,进行相应的处理,可以通过重载terminated()方法来实现。


状态转换:线程池的状态之间的转换和线程类似。
Java 线程知识笔记 (四) Executor与ThreadPool 其二


COUNT_BITS 与 CAPACITY
解释完状态的转换,就要解释COUNT_BITS的作用了。Integer.SIZE是32表示Integer是32位整型,我们一共只有5个状态,犯得着用Integer.SIZE去表示吗?首先说COUNT_BITS是什么:COUNT_BITS表示线程数量。COUNT_BITS = Integer.SIZE - 3也就是说最多有29位表示线程数量。那么后面的int RUNNING = -1 << COUNT_BITS就好理解了:状态用int型表示,左移29位就是状态,一共32位,高3位是状态,剩下29位就是线程数量。用高3位标识-1(111)0(000)1(001)2(010)3(011),因此线程池默认的最大容量CAPACITY 就是29位能表示的最大值2^30-1= ‭536870911‬。


线程池的安全关闭

一般来说,线程池不会关闭,因为用到线程池的场景会反复的调用线程,因此会维持一定数量的线程等待请求。但是如果某些需求需要手动关闭的的时候,就需要我们手动去维护了,但是安全的关闭线程池也是一个要注意的点。上面我们知道只有处于TERMINATED才表示真正关闭了线程池,因为为了保证线程池内所有的线程都安全的结束,我们需要等待线程池处于TERMINATED的状态才可以,比如:

ExecutorService service= Executors.newSingleThreadExecutor();
service.shutdown();
while(service.isTerminated()){
    //使用一个循环空转,直到线程池状态处于TERMINATED为止
}
System.out.println("pool is closed");

线程的复用

说到最后还有一个关键点,线程池是如何做到线程复用的。上一篇说到线程池中所有的任务都会被转换成Worker类对象执行,直接去Worker进入构造方法。

private boolean addWorker(Runnable firstTask, boolean core) {
    /**略**/
    Worker w = null;
    try {
        w = new Worker(firstTask); //直接进入这里
        final Thread t = w.thread; //这里的线程也是拿的Worker对象的
        if (t != null) {
            /**略**/
            if (workerAdded) {
                t.start(); //启动线程
                workerStarted = true;
            }
        }
    }
         /**略**/
    return workerStarted;
}

进入后发现thread是通过一个newThread()方法构建出来的,但是传入的是this,也就是Worker这个内部类对象本身,接着往下走会进入默认的线程工厂中。

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask; //task传值
    this.thread = getThreadFactory().newThread(this);//创建核心线程
}

这个方法在Executors.DefaultThreadFactory#newThread()这里。

public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,
                          namePrefix + threadNumber.getAndIncrement(),
                          0);
    if (t.isDaemon())
        t.setDaemon(false);
    if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
    return t;
}

进来以后发现这里还是new出来一个线程再用,那线程池说好的线程复用呢?其实这里new出来的Worker里的线程是线程池的核心线程。因为当线程被拿出来以后,调用了start()方法启动了这个new出来的核心线程,所以我们继续去Worker.run()里面。

public void run() {
    runWorker(this);
}

接着往runWorker()里走。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    /**略**/
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            /**略**/
            try {
                /**略**/
                try {
                    task.run(); //执行线程逻辑
                } catch (RuntimeException x) {
                /**略**/
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

可以看到这里task被拿出来,然后经过while循环。点睛之处就在这个while循环的条件里,只要task不是空的之就行,但是如果task是空的那么就从getTask()里面取数据。getTask()正是从阻塞队列workQueue中取任务的方法。这样线程池中的线程永远都不会停下来,源源不断地从队列中取出任务执行。那么如果创建一个核心线程数为2,最大线程数为3的线程池执行逻辑就是这样的:

  1. 任务1进来,开一个线程池核心线程1,直接执行。
  2. 任务2进来,再开一个核心线程2,直接执行。
  3. 任务3进来,此时发现核心线程已经都开起了,而且在RUNNING状态,把任务3放进队列。
  4. 核心线程1执行任务1结束,经过while循环使用getTask()拿出任务3执行。
  5. 任务n进来,发现核心线程满了,尝试放进队列,发现队列也满了,直接创建一个非核心线程f1,直接执行。
  6. 任务n+1进来,发现核心线程满了,队列满了,非核心线程满了,报错。

如此核心线程永远不停下来,永远空转从队列中拿任务执行直到线程池被杀掉。而非核心线程是有空转超时时间的,因此到时间就自己结束了。这就是线程池中线程复用的本质。

总结

到此线程池的内容基本就结束了,这篇博客对上一篇源码分析做了一个知识点的补充,以及最终把线程池中的线程复用从源码角度和逻辑上梳理清楚。希望这些内容对各位读者理解线程池有所帮助。

上一篇:线程的建立


下一篇:Java.util.concurrent之 Executor框架与线程池