文章目录
一、基本概念
ThreadPoolExecutor类实现了ExecutorService接口和Executor接口,可以设置线程池corePoolSize,最大线程池大小,BlockingQueue,AliveTime,拒绝策略等。
常用构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
参数 | 含义 |
---|---|
corePoolSize | 线程池维护线程的最少数量 |
maximumPoolSize | 线程池维护线程的最大数量 |
keepAliveTime | 线程池维护线程所允许的空闲时间 |
unit | 线程所允许的空闲时间单位 |
workQueue | 线程池所使用的缓冲队列,属于BlockingQueue的具体实例 |
handler | 线程池对拒绝任务的处理策略 |
二、缓冲队列BlockingQueue
类型 | 说明 |
---|---|
ArrayBlockingQueue | 是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序 |
LinkedBlockingQueue | 一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue,静态工厂方法Executors.newFixedThreadPool()使用了这个队列 |
SynchronousQueue | 一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。 |
PriorityBlockingQueue | 一个具有优先级的无限阻塞队列 |
三、饱和策略RejectedExecutionHandler
类型 | 说明 |
---|---|
AbortPolicy | 任务队列满后,新的任务将被丢弃,并抛出异常 |
CallerRunsPolicy | 只用调用者所在线程来运行任务 |
DiscardOldestPolicy | 丢弃队列里最近的一个任务,并执行当前任务 |
DiscardPolicy | 不处理,直接丢弃任务 |
… | 也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务 |
四、线程池的工作方式
1、如果此时线程池中的数量小于corePoolSize,即便线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
2、如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
3、如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,则创建新的线程来处理被添加的任务。
4、如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过handler所指定的策略来处理此任务。
处理任务的优先级为:
核心线程corePoolSize --> 任务队列workQueue --> 最大线程maximumPoolSize --> 使用handler饱和策略;
五、代码演示
核心线程数:3,最大线程数:5,缓冲队列:10,饱和策略:AbortPolicy
package com.wzl.executor;
/**
* 工作类
*/
public class Worker implements Runnable {
int id;
public Worker(int id) {
this.id = id;
}
@Override
public void run() {
System.out.println("Index: " + id + " is running , Thread Name:" + Thread.currentThread().getName());
try {
Thread.sleep(6000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package com.wzl.executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Test {
private static final Logger log = LoggerFactory.getLogger(Test.class);
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
3,
5,
1 * 60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(10),
new ThreadPoolExecutor.AbortPolicy()//饱和策略,AbortPolicy:任务队列满后,新的任务将被丢弃,并抛出异常
);
for (int i = 0; i < 100; i++) {
try {
threadPoolExecutor.execute(new Worker(i));
} catch (Exception e) {
log.error("发生异常,当前index:" + i, e);
break;
}
}
threadPoolExecutor.shutdown();
}
}
运行结果 & 解答
- Index 0、1、2 交给corePoolSize,正在执行
- Index 3 ~ 12 进入缓冲队列(size == 10)进行等待
- Index 13、14 交给maximumPoolSize扩展的2个线程进行处理,正在执行
- Index 15 基于饱和策略,进行处理,这里是拒绝任务,抛出异常
- Index 3 ~ 12 进入缓冲队列(size == 10)进行等待,有空闲线程后进行处理