动态线程池
本篇提炼了美团动态线程池博客的核心内容,并结合自我理解进行了简单实现。
链接:https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
为什么用线程池
好处
-
降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
-
提高响应速度:任务到达时,无需等待线程创建即可立即执行。
-
提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
-
提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
解决的问题
- 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
- 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
- 系统无法合理管理内部的资源分布,会降低系统的稳定性。
“池化”思想
- 内存池(Memory Pooling):预先申请内存,提升申请内存速度,减少内存碎片。
- 连接池(Connection Pooling):预先申请数据库连接,提升申请连接的速度,降低系统的开销。
- 实例池(Object Pooling):循环使用对象,减少资源在初始化和释放时的昂贵损耗。
应用
1、快速响应用户请求
2、快速处理批量任务
基础
一、线程池总体设计和七大参数
Executor:顶层接口,提供思想:将任务提交和任务执行进行解耦,用户提供Runnable对象,Executor框架完成线程的调配和任务的执行
ExecutorService:增加一些能力
(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;
(2)提供了管控线程池的方法,比如停止线程池的运行。
AbstractExecutorService:上层抽象类
ThreadPoolExecutor:
(1)维护自身的生命周期
(2)管理线程和任务
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
// corePoolSize 核心线程数 -> 该线程池一直在运行的线程 -> 并非不可回收,设置allowCoreThreadTimeOut
// maximumPoolSize 最大线程数 -> 该线程池最大可同时运行的线程
// keepAliveTime 空闲线程存活时间
// unit 空闲线程存活时间单位
// workQueue 达到核心线程数后(准确来说是没有空闲线程时)新增任务放入该工作队列 -> 生成者消费者模型
// threadFactory 生成线程的线程工厂 -> 可自定义线程池名
// handler 拒绝策略 -> 达到最大线程数且阻塞队列已满时对新增任务的处理(不一定拒绝!)
工作队列:
拒绝策略:
二、ThreadPoolExecutor
2.1、ThreadPoolExecutor运行机制
线程池:生产者消费者模型,线程和任务解耦,缓冲任务,复用线程
任务管理:生产者
(1)直接申请线程执行该任务;
(2)缓冲到队列中等待线程执行;
(3)拒绝该任务。
线程管理:消费者
2.2、ThreadPoolExecutor的运行状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 运行状态(runState):高3位
// 线程数量 (workerCount):低29位
private static int runStateOf(int c) { return c & ~CAPACITY; } //计算当前运行状态
private static int workerCountOf(int c) { return c & CAPACITY; } //计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线程数生成ctl
2.3、ThreadPoolExecutor线程池回收线程机制
① Hash表持有线程的引用,通过添加引用、移除引用来控制线程的生命周期
② Worker继承AQS,使用AQS实现独占锁(非可重入锁ReentrantLock),通过不可重入来反映线程的执行状态。
1.lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
2.如果正在执行任务,则不应该中断线程。
3.如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
4.线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。
三、任务管理
3.1、任务执行机制
- 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
- 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
3.2、任务申请机制(getTask)
1、任务直接由新创建的线程执行(线程初始创建)
2、线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行
四、线程管理
4.1、Worker线程管理
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;//Worker持有的线程
Runnable firstTask;//初始化的任务,可以为null
}
4.2、worker线程增加(addWorker)
4.3、Worker线程执行任务(runWorker)
1.while循环不断地通过getTask()方法获取任务。
2.getTask()方法从阻塞队列中取任务。
3.如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
4.执行任务。
5.如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。
4.4、Worker线程回收
try {
while (task != null || (task = getTask()) != null) {
//执行任务
}
} finally {
processWorkerExit(w, completedAbruptly);//获取不到任务时,主动回收自己
}
线程池的问题
参数设置的合理性:
三个参数:
corePoolSize:核心线程数
maximumPoolSize:最大线程数
workQueue:工作队列
动态线程池设计
一、目标
① 参数动态调整
② 监控(任务和线程池)
③ 报警(任务和线程池)
④ 权限和日志。。。
二、参数动态调整
线程池目前提供5个参数供我们修改,上图为修改corePoolSize的流程图
工作队列的长度没有提供修改入口,我们需要通过线程池获取到工作队列自行修改
工作队列的长度字段可能为final类型,我们可以自定义自己的工作队列使其可以修改
二、监控
线程池监控
线程池提供了部分api供我们查看线程池的实时状态,例:当前有多少个工作线程,执行了多少个任务,队列中等待的任务数等等
活跃度:activeCount/maximumPoolSize
Reject异常情况
线程池任务监控
任务实际执行频率和时长 -> 任务级别的埋点 -> 不同业务不同线程名称
三、报警
线程池活跃度
任务的执行Transaction(频率、耗时)
Reject异常
线程池内部统计信息
等等