线程池原理——总结篇
复用已创建的线程,避免频繁创建/销毁线程的开销
控制并发的数据,避免并发数量过多消耗资源
方便对线程做统一的管理
ThreadPoolExecutor核心构造方法
- int corePoolSize:池内核心线程数最大值
- int maxmunpoolSize:线程总最大数
- long keepAliveTime:非核心线程闲置时间
- TimeUnit unit:keepAliveTime的时间单位
- BlockingQueue workQueue:阻塞队列,待执行的Runnable任务对象
- LinkedBlockingQueue:链式阻塞式队列,可指定大小,默认大小为Integer.MAX_VALUE
- ArrayBlockingQueue:数组阻塞式队列,需要指定大小
- SynchronousQueue:同步队列,容量为0,每个put操作必须等待一个take操作,反之亦然
- DelayQueue:延迟队列,队列中的元素只有当其指定的时间到了才可以被获得
- ThreadFactory threadFactory:非必须,统一在创建线程时设置一些参数,如是否守护线程、优先级等
- RejectedExecutionHandler handler:非必须,拒绝处理策略
- ThreadPoolExecutor.AbortPolicy:默认拒绝策略,丢弃任务并抛异常
- ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,不抛异常
- ThreadPoolExecutor.DiscardoldestPolicy:丢弃阻塞队列中的头部,然后重试,若失败,重复此过程
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程去处理该任务
线程池状态及任务处理流程
-
线程池状态信息
线程池本身具有一个调度线程,用于处理整个线程池的事务和任务,如创建/销毁线程、阻塞队列管理等,因此线程池有自身的状态信息
ThreadPoolExecutor类中定义了一个volatile int变量runState表示线程池状态:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED- 创建后处于RUNNING
- 调用shutdown()方法,进入SHUTDOWN,此时不接受新任务,清除一些空闲worker,等待阻塞队列的任务完
- 调用shutdownNow()方法,进入STOP状态,不接受任务、中断所有线程,丢弃阻塞队列中所有任务,此时poolSieze=0,阻塞队列size=0,读取阻塞队列为null
- 所有任务终止,ctl(AtomicInteger类型)记录的线程数为0,线程池状态变为TIDYING,接着执行terminated()方法
- 执行完后进入TERMINATED状态
-
处理流程
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 获得线程池状态 // 1.当前线程数⼩于corePoolSize,则调⽤addWorker创建核⼼线程执⾏任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 2.如果不⼩于corePoolSize,则将任务添加到workQueue队列。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 2.1 如果isRunning返回false(状态检查),则remove这个任务,然后执⾏拒绝策略。 if (! isRunning(recheck) && remove(command)) reject(command); // 2.2 线程池处于running状态,但是没有线程,则创建线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 3.如果放⼊workQueue失败,则创建⾮核⼼线程执⾏任务, // 如果这时创建⾮核⼼线程失败(当前线程总数不⼩于maximumPoolSize时),就会执⾏拒绝策略。 else if (!addWorker(command, false)) reject(command); }
- 空指针检测,阻塞队列中也不允许出现空值
- 线程数<corePoolSize时,调⽤addWorker()创建新核心线程执行任务
- 线程数>=corePoolSize时,尝试添加至阻塞队列,若失败尝试使用非核心线程执行任务,如果执行失败就触发拒绝策略
如何实现线程复用及何时回收线程
ThreadPoolExecutor在创建线程时,会把线程封装成worker,并放入工作线程组,worker反复从阻塞队列中读任务,实现线程复用
-
线程复用原理
- 在处理流程中,通过execute(Runnable task)方法执行任务时,会调用addWorker()方法
- execute(Runnable task)方法先判断线程数是否超出阈值,超出返回false
- 否则创建一个Worker对象(Worker类实现了Runnable接口,是一个线程任务,其run方法调用了runWorker(Worker w)方法),并初始化一个Thread对象,然后启动,此时需要利用ReentrantLock加上全局锁,更新线程次状态信息
- 在runWorker(Worker w)方法内,只要当前任务不为空或不断读取阻塞队列中的任务不为空时,就调用task.run()方法执行任务,从而实现线程的复用(任务执行的前后也加全局锁)
- 因此何时回收线程的关键就在于对阻塞队列中的任务获取(getTask()方法),当getTask()方法不返回null时,线程就不会退出,实现复用
-
回收线程的步骤
- 在getTask()方法中首先会判断线程池状态,及阻塞队列是否为空,判断是否返回null(见线程池状态)
- getTask()方法会判断线程数是否大于核心数,如果此时阻塞队列为空,则递减worker数量(cas操作),并返回null,外部接收null值的线程则被会回收,直到当前线程小于等于核心线程数
- 在getTask()方法中,其底层实现是workQueue.poll(keepAliveTime,TimeUnit.NANOSECONEDS)和workQueue.take(),前者满足时间时会返回null,后者不允许返回null(阻塞并被挂起)
- 如果阻塞队列不为空时,且allowCoreThreadTimeOut属性为false(默认值),存在线程getTask()方法则调用workQueue.take()方法,核心线程会阻塞在这并被挂起
- 如果allowCoreThreadTimeOut属性置为true,keepAlive时间内无任务可以返回null,实现线程的回收