线程池的作用
线程的创建需要开辟虚拟机栈、本地方法栈、程序计数器等线程私有的内存空间,在线程销毁时需要回收这些系统资源。频繁的创建和销毁线程会浪费大量的系统资源,增加并发编程的风险;另外,线程自身无法解决在服务器负载过大的时候让新的线程等待或友好的拒绝服务的问题。所以需要通过线程池协调多个线程,并实现类似主次线程隔离、定时执行、周期执行等任务。
线程池的作用包括:
- 利用线程池管理并复用线程、控制最大并发数;
- 实现任务线程队列缓存策略和拒绝机制;
- 实现某些与时间相关的功能,如定时执行,周期执行等;
- 隔离线程环境。比如,交易服务和搜索服务在同一台服务器上,分别开启两个线程池,交易线程的资源消耗明显要大,因此,通过配置独立的线程池,将较慢的交易服务与搜索服务隔离开,避免个服务线程相互影响。
ThreadPoolExecutor
ThreadPoolExecutor顶层接口是Executor,ExecutorService接口继承了Executor接口,定义了 管理线程任务的方法;
ExecutorService的抽象类AbstractExecutorService提供了submit()、invokeAll()等部分方法的实现;
ThreadPoolExecutor实现了最复杂的运行部分,一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。
详细参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
//其余代码
}
-
corePoolSize 核心线程数。如果等于0,任务执行完之后,没有任何请求进入时销毁线程池的线程;如果大于0,即使本地任务执行完毕,核心线程也不会被销毁。这个值设置过大会浪费资源,设置过小会导致线程频繁的创建销毁线程
-
maximumPoolSize 最大线程数。从构造函数中第一个if可以看出,该值必须大于或等于1。如果待执行的线程数大于此值,则需要借助第五个参数的帮助,缓存在队列中。如果maximumPoolSize 和corePoolSize 相等,即是固定大小线程池
-
keepAliveTime 线程空闲时间。当空闲时间达到keepAliveTime值时,线程会被销毁,知道只剩下corePoolSize个线程为止,避免浪费内存和句柄资源。默认情况下,当线程数大于corePoolSize时,keepAliveTime才会起作用。但是当ThreadPoolExecutor的allowCoreThreadTimeOut变量设置为True时,核心线程超时后也会被回收
-
TimeUnit keepAliveTime的时间单位,通常时间单位是TimeUnit.SECONDS。
-
workQueue 缓存队列。当请求的线程数大于corePoolSize时,线程进入BlockingQueue阻塞队列。
-
threadFactory 线程工厂。用来生产一组相同任务的线程。线程池的命名时通过给这个factory增加组名前缀来实现的。
-
handler 拒绝策略。当workQueue 的任务缓存区到达上限,并且活动线程数大于maximumPoolSize的时候,线程池通过该策略处理请求。
友好的拒绝策略可以是以下三种:1、保存到数据库进行削峰填谷。在空闲时再提取出来执行 2、转向某个提示页面 3、打印日志
线程池运行状态
线程池的5种状态:Running、ShutDown、Stop、Tidying、Terminated。
线程池状态源码:
// Integer共有32位,最右边29位标识工作线程数,最左边三位表示线程池状态
// 三个二进制位可以表示从0到7的8个不同数值
private static final int COUNT_BITS = Integer.SIZE - 3;
// 类似于子网掩码,用于位的与运算
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
线程池的状态用高三位表示,其中包括了符号位。五种 状态的十进制按从小到大的顺序依次为:RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,这样可以根据值的大小来确定线程池的状态,例如isRunning的判断:
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
五种状态介绍:
- RUNNING 能够接受新的任务,以及对已添加的任务进行处理
线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0! - SHUTDOWN 不接受新任务,但能处理已添加任务
调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。 - STOP 不接受新任务,不处理已添加任务,并且会终端正在处理的任务
调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。 - TIDYING 当所有的任务已终止,ctl记录的任务数量为0,线程池会变为TIDYING 状态,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。
当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。 - TERMINATED 线程池彻底终止,就变成TERMINATED状态。
线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。
线程池任务处理流程
所有的任务都是由execute方法完成的
- 检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务
- 如果workerCount<corePoolSize,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount>=corePoolSize,且线程池内的缓存队列未满,则将任务添加到队列中
此处在成功加入到队列后,还会重新检查一下是否应该添加一个线程(因为在上次检查之后可能已有线程死掉了),或者在进入这个方法之后线程池关闭了 - 如果workerCount>=corePoolSIze && wokerCount<maximumPoolSize,且线程池内缓存队列已满,则创建并启动一个线程来执行新提交的任务
- 如果workerCount >= maximumPoolSize,并且线程池内的缓存队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
- 线程池中存活的线程执行完当前任务后,会在循环中反复从BlockingQueue队列中获取任务来执行
execute源码分析:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 包含线程数以及线程池状态的值
int c = ctl.get();
// 如果工作线程数小于核心线程数,则创建线程任务并执行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 如果创建失败,防止外部已经在线程池中加入新的任务,重新获取
c = ctl.get();
}
// 只有线程处于RUNNING状态,才执行 置入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 这里做了double check,如果此时线程池不是RUNNING状态,将加入队列的任务一处
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果之前的线程已经被消费完,新建一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 核心池和队列都满了,尝试创建一个新的线程
else if (!addWorker(command, false))
// 如果创建失败,则进行拒绝策略
reject(command);
}
workQueue 缓存队列
JDK 提供了四种工作队列
- SynchronousQuene *队列,直接提交队列
不缓存任务的阻塞队列,没有容量,生产者放入一个任务必须等到消费者取出这个任务。
如果线程数量小于 maximumPoolSize,则创建新的进程,如果达到 maximumPoolSize ,则根据设置的 handler 执行拒绝策略。
这种方式提交的任务不会被缓存起来,而是会被马上执行
public class Test {
private static ExecutorService pool;
public static void main(String[] args) {
//maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛出异常
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
for(int i=0;i<3;i++) {
pool.execute(new ThreadTask());
}
}
}
class ThreadTask implements Runnable{
public ThreadTask() {}
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
执行结果:
- ArrayBlockingQueue 有界队列,
基于数组的有界队列,当线程池中线程数量达到 corePoolSize 后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已满,则创建一个新线程,如果线程数量达到 maximumPoolSize,则会执行拒绝策略
public class Test {
private static ExecutorService pool;
public static void main(String[] args) {
//maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛出异常
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
for(int i=0;i<8;i++) {
pool.execute(new ThreadTask());
}
}
}
class ThreadTask implements Runnable{
public ThreadTask() {}
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
执行结果:
- LinkedBlockingQuene *阻塞队列
基于链表的*阻塞队列(其实最大容量为 Interger.MAX),按照 FIFO 排序;当线程池中线程数量达到 corePoolSize 后,再有新任务进来,会一直存入该队列,而不会去创建新线程,所以使用该工作队列时,参数 maximumPoolSize 其实是不起作用的。
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
执行结果:
- PriorityBlockingQueue 优先级任务队列
使用该队列,需要实现Comparable接口,指定优先级的比较策略;它属于*队列,与LinkedBlockingQuene 一样,当线程池中线程数量达到 corePoolSize 后,再有新任务进来,会一直存入该队列,而不会去创建新线程
public class Test {
private static ExecutorService pool;
public static void main(String[] args) {
//maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛出异常
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<Runnable>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
for(int i=0;i<8;i++) {
pool.execute(new ThreadTask(i));
}
}
}
class ThreadTask implements Runnable,Comparable<ThreadTask>{
private int priority;
public ThreadTask() {}
public ThreadTask(int priority) {
this.priority = priority;
}
public void run() {
System.out.println(Thread.currentThread().getName() + "===> priority is : " + this.priority);
}
@Override
public int compareTo(ThreadTask o) {
return this.priority > o.priority ? -1 : 1;
}
}
执行结果:
除此之外还有一些队列:
- DelayedQueue *队列。用来延时处理的队列,所谓延时处理就是说可以为队列中元素设定一个过期时间,相关的操作受到这个设定时间的控制。
- LinkedTransferQueue *队列。由链表结构组成的*阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。它采用一种预占模式,意思就是线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为null)入队,然后线程被等待在这个节点上,后面任务入队时发现有一个元素为null的节点,任务就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的线程取走元素,从调用的方法返回。
threadFactory 线程工厂
线程池中线程就是通过 ThreadPoolExecutor 中的 ThreadFactory 线程工厂创建的。通过自定义 ThreadFactory,可以按需要对线程池中创建的线程进行一些特殊的设置,例如设置线程命名:
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "testThread-" + new Date());
}
},
new ThreadPoolExecutor.AbortPolicy());
运行效果:
handler 拒绝策略
在ThreadPoolExecutor中提供了四个公开的静态内部类:
- AbortPolicy(默认):丢弃任务并抛出RejetedExecutionException异常
- DiscardPolicy:丢弃任务,但不抛出异常
- DiscardOldestPolicy:抛弃队列中最早的任务,然后将当前任务加入到队列
- CallerRunsPolicy:调用任务的Run方法绕过线程池直接执行
拿CallerRunsPolicy来分析一下:
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
可以看到有两个线程直接走的main
看下源码:
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
可以看出内置的拒绝策略实现了 RejectedExecutionHandler,我们也可以自己定义一个拒绝策略:
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(3),
Executors.defaultThreadFactory(),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + "进入个性化拒绝策略");
r.run();
}
});
运行效果:
Executors中的几种线程池
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newCachedThreadPool将创建一个可缓存的线程池。如果线程池的当前规模超过了处理需求时,那么就会根据keepAliveTime来回收部分空闲线程,默认是60秒;如果任务数增加,再次创建出新线程处理任务。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newFixedThreadPool将创建一个固定长度的线程池,每当提交一个任务时就会创建一个线程,直到达线程池的最大数量;输入的参数即是固定线程数,即是核心线程数也是最大线程数,不存在空闲线程,所以keepAliveTime等于0,该线程池配置的缓存队列是*的,也就是说,如果我的线程都阻塞后,会有大量请求在缓存队列中不会被执行。
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newSingleThreadExecutor是一个单线程线程池,相当于单线程串行执行所有任务,保证按任务的提交顺序依次执行
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
newScheduledThreadPool创建了一个固定长度的线程池,而且使用DelayedWorkQueue以延迟或定时或周期的方式来执行任务,类似于Timer。可应用于重发机制。
newWorkStealingPool
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
newWorkStealingPool是jdk8引入,创建持有足够线程的线程池支持给定的并发量,并通过多个队列减少竞争,该方法将cpu数量设置为默认并发量。
可以分析下ForkJoinPool的构造函数:
/**
* Creates a {@code ForkJoinPool} with the given parameters.
*
* @param parallelism the parallelism level. For default value,
* use {@link java.lang.Runtime#availableProcessors}.
* @param factory the factory for creating new threads. For default value,
* use {@link #defaultForkJoinWorkerThreadFactory}.
* @param handler the handler for internal worker threads that
* terminate due to unrecoverable errors encountered while executing
* tasks. For default value, use {@code null}.
* @param asyncMode if true,
* establishes local first-in-first-out scheduling mode for forked
* tasks that are never joined. This mode may be more appropriate
* than default locally stack-based mode in applications in which
* worker threads only process event-style asynchronous tasks.
* For default value, use {@code false}.
* @throws IllegalArgumentException if parallelism less than or
* equal to zero, or greater than implementation limit
* @throws NullPointerException if the factory is null
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
private static int checkParallelism(int parallelism) {
if (parallelism <= 0 || parallelism > MAX_CAP)
throw new IllegalArgumentException();
return parallelism;
}
构造函数上也有大段的注释解释了每个参数的意思,其中
- parallelism 并发数量,默认是java.lang.Runtime#availableProcessors
- asyncMode 并发模式,优先规则优先处理其自身队列按照LIFO或者FIFO的任务,true为FIFO,false为LIFIO
上述几种线程池的相关类图:
如何定义线程池参数
- 线程数:
cpu密集型: CPU密集型任务应配置尽可能少的线程,如配置核心数为 N+1 个线程的线程池。
IO密集型:IO密集型的任务并不是一直都在执行任务,存在读写阻塞等待,因此应配置较多的线程,如核心数 2N 个,最大线程数 25N 个
混合型:将任务分为 CPU 密集型和 IO 密集型,然后分别使用不同的线程池去处理,从而使每个线程池可以根据各自的工作负载来调整
大量的耗时任务执行的时候,并不需要瞬时的完成,而是关注如何使用有限的资源,尽可能在单位时间内处理更多的任务,也就是吞吐量优先。这个时候应该设置较大队列去缓冲并发任务,调整合适的核心线程数,避免过多引发 线程上下文切换频繁导致的资源消耗 - 缓存队列:
建议使用有界队列,并可以根据需要设置大一点。基于链表的LinkedBlockingQueue比基于数组的ArrayBlockingQueue吞吐量高,因为增删多,需要内存大。有界队列能增加系统的稳定性和预警能力,可以根据需要设置大一点,比如几千。有时会遇到任务执行缓慢的时候,为了避免任务被拒绝,可以适当将队列设置大一些。 - 拒绝策略
默认采用的是 AbortPolicy 拒绝策略,直接在程序中抛出RejectedExecutionException;如果使用默认策略,可以在程序捕获异常中对任务进行处理
另外,还可以自定义拒绝策略,只需要实现RejectedExecutionHandler接口;如果任务不是特别重要,使用 DiscardPolicy 和 DiscardOldestPolicy 拒绝策略将任务丢弃也是可以的。
总结
- 合理设置各类参数,应根据实际业务场景来设置合理的工作线程数
- 线程资源必须通过线程池提供,不允许在程序中自行显示创建线程
- 创建线程或线程池时需要指定有意义的线程名称,方便出错时回溯
线程池尽力不要使用Executors,而是通过ThreadPoolExecutor的方式创建,这样的处理方式能更加明确线程池的运行规则,规避资源耗尽的风险
本篇文章参考资料:
《码出高效-Java开发手册》7.4章节
作者:日常更新——Java线程池ThreadPoolExecutor类
作者:杨七 ——Java线程池总结