ThreadPoolExecutor的核心参数以及线程池状态到底是咋玩的?

文章目录

author:编程界的小学生

date:2021/05/16

flag:不写垃圾没有营养的文章!

如果下面的问题你都会的话就别在这浪费时间啦

  • 线程池的参数有哪些?分别代表啥意思?

  • 线程池是怎么创建线程的?

  • 任务拒绝策略有哪些?

  • 线程池的状态都有哪些?怎么存储的?怎么计算的(如何获取到的状态变量?如何获取到的线程数?)?

  • 为什么用-1来表示运行中RUNNING状态?

  • new一个线程池,他的活跃线程数是多少?怎么计算的?

PS:像线程池流程源码、核心非核心线程怎么保证一直存活等等问题后面会讲解,此篇幅核心是线程池状态的那个位运算到底是咋玩的。

1、线程池参数

1.1、线程池参数有哪些?都啥意思?

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
}
  • corePoolSize:核心线程数

线程池在完成初始化之后,默认情况下,线程池中不会有任何线程,线程池会等有任务来的时候再去创建线程,核心线程创建出来后即使超出了线程保持的存活时间配置也不会销毁,核心线程只要创建了就永驻了,就等着新任务来进行处理。除非设置了allowCoreThreadTimeOut,否则核心线程数是保持活动的最小数量0

  • maximumPoolSize:最大线程数

核心线程忙不过来且任务队列都满了的情况下,还有新任务继续提交进来的话就会新开辟线程,但是也不会任意的开辟线程数量,线程数(包含核心线程数)阈值是maximumPoolSize,达到阈值后还在提交任务的话就走拒绝策略。

  • keepAliveTime:非核心线程保持存活的时间

如果线程池当前的线程数多于corePoolSize,那么如果多余的线程空闲之间超出keepAliveTime的话,则这些线程就会被回收。

  • unit:非核心线程保持存活的时间单位

比如:TimeUnit.MILLISECONDSTimeUnit.SECONDS

  • workQueue:任务存储队列

核心线程数满了后还在继续提交任务到线程池的话,就先进入workQueueworkQueue通常有以下几种选择:

LinkedBlockingQueue:*队列,默认长度限制是int的最大值。也可以自定义大小。

ArrayBlockingQueue:有界队列,可以自定义大小。

SynchronousQueueExecutors.newCachedThreadPool();默认使用的队列。也不算是个队列,他不没有存储元素的能力。

一般我都采取LinkedBlockingQueue,因为他也可以设置大小,可以取代ArrayBlockingQueue有界队列。

  • threadFactory:当线程池需要新的线程时,会用threadFactory来生成新的线程。

默认采用的是DefaultThreadFactory,主要负责创建线程。newThread()方法。创建出来的线程都在同一个线程组里且优先级是一样的。

  • handler:拒绝策略。任务超出线程池的配置限制后或执行shutdown后还在继续提交任务的话,会执行handler里的逻辑。

默认采取的是AbortPolicy拒绝策略。也就是直接抛出RejectedExecutionException异常。

1.2、线程池是怎么创建线程的?

是通过java.util.concurrent.ThreadFactory#newThread来负责创建的,ThreadFactory是个接口,比如有个Default实现io.netty.util.concurrent.DefaultThreadFactory#newThread(java.lang.Runnable)

public Thread newThread(Runnable r) {
    Thread t = this.newThread(FastThreadLocalRunnable.wrap(r), this.prefix + this.nextId.incrementAndGet());

    try {
        if (t.isDaemon() != this.daemon) {
            t.setDaemon(this.daemon);
        }

        if (t.getPriority() != this.priority) {
            t.setPriority(this.priority);
        }
    } catch (Exception var4) {
        ;
    }

    return t;
}

可以看到如下信息:

  • 就是个线程工厂类一样,负责创建线程,创建的方式也是直接new Thread
  • 并没有真正的启动,也就是没有调用start方法,只是new出来了而已,也就是说new一个线程池并没有启动线程

真正启动线程是在来任务的时候启动的,之后文章会深度剖析。

  • 线程优先级以及是否是守护线程都是可以配置

1.3、任务拒绝策略有哪些?

顶层父接口是java.util.concurrent.RejectedExecutionHandler,其四大子类如下:

  • AbortPolicy:抛出一个RejectedExecutionException异常,默认的拒绝策略
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() {}
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}
  • DiscardPolicy:直接丢弃任务,源码很简单,我不做任何处理,其实就是丢弃嘛
public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}
  • DiscardOldestPolicy:丢弃队列里最老的任务(直接从queue中弹出最前面的一个任务),将当前这个任务继续提交给线程池
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}
  • CallerRunsPolicy:交给线程池调用所在的线程进行处理。也就是说不允许丢任何任务,一般用于不允许失败的、对性能要求不高、并发量较小的场景下使用,因为提交的任务一定会被运行,但是由于是调用者线程自己执行的,当多次提交任务时,就会阻塞后续任务执行,性能和效率自然就慢了。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

1.4、线程池的原理是什么?

  • 线程池刚启动的时候核心线程数为0。
  • 丢任务给线程池的时候线程池会开启线程来执行这个任务。
  • 若线程数小于corePoolSize的话,即使工作线程处于空闲状态,也会创建一个新线程来执行任务。
  • 若线程数大于等于corePoolSize的话,则会将任务放到workQueue,也就是任务队列。
  • 若任务队列满了,且线程数小于maximumPoolSize,则会创建一个新线程来运行任务。
  • 若任务队列满了,且线程数大于等于maximumPoolSize,则会直接采取拒绝策略。

ThreadPoolExecutor的核心参数以及线程池状态到底是咋玩的?

PS:源码会在之后的文章深度剖析。

2、线程池的状态

2.1、线程池有哪几种状态?

如下五种:

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
  • RUNNING:接收新任务和线程池队列中的任务
  • SHUTDOWN:不接受新任务,但是接收线程池队列中的任务
  • STOP:不接收新任务也不接收线程池队列中的任务,并且打断正在进行中的任务
  • TIDYING:所有任务终止,待处理任务数量为0的时候,线程转换为TIDYING状态,将会执行terminated钩子函数
  • TERMINATED:terminated函数执行完成后会变为此状态

2.2、线程池的状态是怎么存储的?

采取一个int类型名叫ctl的变量来存储。低29位存储线程池中线程的活跃数量,高3位存储线程池的状态。

如下源码:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池容量
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 线程池状态,只有RUNNING状态是负数。
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

为啥这么设计?因为作者是Doug Lea,当然是节省空间,如果弄一个int存储线程数,一个int存储仅需要3位的状态,那浪费太多空间了。即使用byte存储状态,也浪费了五个bit空间。想想读写锁的设计,也是出自Doug Lea之手。比如:
ThreadPoolExecutor的核心参数以及线程池状态到底是咋玩的?

2.3、线程池的状态是怎么计算的?

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 线程池状态,只有RUNNING状态是负数。
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

我们知道在java中 int 类型占用4个字节32位存储, 上述的几种状态:
底层存储二进制为:

1111 1111 1111 1111 1111 1111 1111 1111(-1)
0000 0000 0000 0000 0000 0000 0000 0000(0)
0000 0000 0000 0000 0000 0000 0000 0001(1)
0000 0000 0000 0000 0000 0000 0000 0010(2)
0000 0000 0000 0000 0000 0000 0000 0011(3)

左移<<COUNT_BITS,COUNT_BITS = Integer.SIZE - 3 也就是 COUNT_BITS = 29,说明用32位的前3位存储线程池的状态
后29位存储线程池中当前线程的个数, << COUNT_BITS后,变为下面的二进制:

1110 0000 0000 0000 0000 0000 0000 0000
0000 0000 0000 0000 0000 0000 0000 0000
0010 0000 0000 0000 0000 0000 0000 0000
0100 0000 0000 0000 0000 0000 0000 0000
0110 0000 0000 0000 0000 0000 0000 0000

我们可以看到,前三位存储的是标识线程状态的二进制

如何获取线程池状态的?

private static int runStateOf(int c) { return c & ~CAPACITY; }

其中CAPACITY = (1 << COUNT_BITS) - 1 转化为二进制为:
0001 1111 1111 1111 1111 1111 1111 1111
取反"~"后(也就是将前3位全部变为1,后面全部变为0),二进制为:
1110 0000 0000 0000 0000 0000 0000 0000
接下来,传入的ctl变量和~CAPACITY&操作,只会保留ctl变量的前3位变量,后29位变量全部为0

例如:一个标识当前状态为STOP状态的线程池和当前活跃线程数为2的ctl变量为:
0010 0000 0000 0000 0000 0000 0000 0011
和上述得到的1110 0000 0000 0000 0000 0000 0000 0000(~CAPACITY)做&操作后得到:

0010 0000 0000 0000 0000 0000 0000 0010
&
1110 0000 0000 0000 0000 0000 0000 0000
=
0010 0000 0000 0000 0000 0000 0000 0000    

0010 0000 0000 0000 0000 0000 0000 0000 和上述分析的STOP的状态的二进制相同! 即获得了当前线程的状态!

如何获取当前线程池活跃线程数的?

private static int workerCountOf(int c) { return c & CAPACITY; }

上述知道CAPACITY为:0001 1111 1111 1111 1111 1111 1111 1111

例如:一个标识当前状态为STOP状态的线程池和当前活跃线程数为2的ctl变量为:

0010 0000 0000 0000 0000 0000 0000 0010
&    
0001 1111 1111 1111 1111 1111 1111 1111
=
0000 0000 0000 0000 0000 0000 0000 0010

所以结果是二进制10,也就是代表当前线程池中活跃线程数量为2个!

2.4、为什么要用-1代表RUNNING状态?

既然采取3bit作为线程池状态,那么也就是有2的3次幂=8种方式,为什么要采取-1作为RUNNING呢?

因为作者是Doug Lea,Doug Lea又怎么了?Doug Lea是并发大神,神中神。

这样设计的话只需要判断state是不是小于0,如果小于0就代表是RUNNING状态,简单粗暴。所以看到if (state < 0)这种类似的代码就是说如果是运行状态的话,就进入判断。所以线程池五种状态用-1这个唯一的一个负数来表示RUNNING状态。比如如下源码:

private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

SHUTDOWN是0,所以判断是不是在运行中只需要判断state是不是小于0即可!

2.5、new一个线程池,他的活跃线程数是多少?怎么计算的?

线程池的状态和活跃线程数量都是通过ctl来管理的,高 3位代表状态,低29位代表活跃线程数量。所以看看ctl默认是多少就好啦。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

哦嚯,发现是通过ctlOf来计算的,继续看ctlOf的源码:

// rs标识线程池当前状态,wc为work count标识当前工作线程的数量。
private static int ctlOf(int rs, int wc) { return rs | wc; }

上述传入的是ctlOf(RUNNING, 0) ,当前状态为RUNING也就是1110 0000 0000 0000 0000 0000 0000 0000 ,wc为0,也就是当前工作线程数为0,其二进制为0000 0000 0000 0000 0000 0000 0000 0000 ,做"|"或操作,即

1110 0000 0000 0000 0000 0000 0000 0000 
| 
0000 0000 0000 0000 0000 0000 0000 0000
=
1110 0000 0000 0000 0000 0000 0000 0000

上述得到的结果1110 0000 0000 0000 0000 0000 0000 0000就标识,当前线程池状态为RUNNING(-1 << COUNT_BITS),线程池活跃线程个数为0!

2.6、线程池状态是怎么转换的?

  • 【RUNNING】->【SHUTDOWN】:调用shutdown()方法。
  • 【RUNNING/SHUTDOWN】->【STOP】:调用shutdownNow()方法。
  • 【SHUTDOWN】->【TIDYING】:队列和线程池都是空的。也就是说shutdown()后任务都执行完毕的时候会变成TIDYING。
  • 【STOP】->【TIDYING】:线程池为空。也就是说执行shutdownNow()方法后,所有线程都停止后状态会变为TIDYING。
  • 【TIDYING】->【TERMINATED】:钩子函数terminated()执行完成的时候。

【微信公众号】
ThreadPoolExecutor的核心参数以及线程池状态到底是咋玩的?

上一篇:关于位运算、位图的一些用法


下一篇:1619523953