一、前言
线程池是开发中绕不开的一个知识点 。
对于移动开发而言,网络框架、图片加载、AsyncTask、RxJava, 都和线程池有关。
正因为线程池应用如此广泛,所以也成了面试的高频考点。
我们今天就来讲讲线程池的基本原理和周边知识。
先从线程的生命周期开始。
二、线程生命周期
线程是程序执行流的最小单元。
Java线程可分为五个阶段:
- 新建(New): 创建Thread对象,并且未调用start();
- 就绪(Runnable): 调用start()之后, 等待操作系统调度;
- 运行(Running): 获取CPU时间分片,执行 run()方法中的代码;
- 阻塞(Blocked): 线程让出CPU,进入等待(就绪);
- 终止(Terminated): 自然退出或者被终止。
线程的创建和销毁代价较高,当有大量的任务时,可复用线程,以提高执行任务的时间占比。
如上图,不断地 Runnable->Runing->Blocked->Runnable, 就可避免过多的线程创建和销毁。
此外,线程的上下文切换也是开销比较大的,若要使用线程池,需注意设置合理的参数,控制线程并发。
三、ThreadPoolExecutor
JDK提供了一个很好用的线程池的封装:ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize:核心线程大小
maximumPoolSize:线程池最大容量(需大于等于corePoolSize,否则会抛异常)
keepAliveTime:线程执行任务结束之后的存活时间
unit:时间单位
workQueue:任务队列
threadFactory:线程工厂
handler:拒绝策略
线程池中有两个任务容器:
private final HashSet<Worker> workers = new HashSet<Worker>();
private final BlockingQueue<Runnable> workQueue;
前者用于存储Worker,后者用于缓冲任务(Runnable)。
下面是execute方法的简要代码:
public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
}
// 若workQueue已满,offer会返回false
if (isRunning(c) && workQueue.offer(command)) {
// ...
} else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
int wc = workerCountOf(c);
if (wc >= (core ? corePoolSize : maximumPoolSize))
return false;
Worker w = new Worker(firstTask);
final Thread t = w.thread;
workers.add(w);
t.start();
}
一个任务到来,假设此时容器workers中Worker数的数量为c,则
- 1、当c < corePoolSize时,创建Worker来执行这个任务,并放入workers;
-
当c >= corePoolSize时,
- 2、若workQueue未满,则将任务放入workQueue
-
若workQueue已满,
- 3、若c < maximumPoolSize,创建Worker来执行这个任务,并放入workers;
- 4、若c >= maximumPoolSize, 执行拒绝策略。
很多人在讲线程池的时候,干脆把workers说成“线程池”,将Worker和线程混为一谈;
不过这也无妨,能帮助理解就好,就像看到一杯水,说“这是水”一样,很少人会说这是“杯子装着水”。
Worker和线程,好比汽车和引擎:汽车装着引擎,汽车行驶,其实是引擎在做功。
Worker本身实现了Runnable,然后有一个Thread和Runnable的成员;
构造函数中,将自身(this)委托给自己的成员thread;
当thread.start(), Worker的run()函数被回调,从而开启 “执行任务-获取任务”的轮回。
private final class Worker implements Runnable{
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
final void runWorker(Worker w) {
Runnable task = w.firstTask;
while (task != null || (task = getTask()) != null) {
task.run();
}
}
private Runnable getTask() {
for (;;) {
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
}
}
当线程执行完任务(task.run()结束),会尝试去workQueue取下一个任务,
如果workQueue已经清空,则线程进入阻塞态:workQueue是阻塞队列,如果取不到元素会block当前线程。
此时,allowCoreThreadTimeOut为true, 或者 n > corePoolSize,workQueue等待keepAliveTime的时间,
如果时间到了还没有任务进来, 则退出循环, 线程销毁;
否则,一直等待,直到新的任务到来(或者线程池关闭)。
这就是线程池可以保留corePoolSize个线程存活的原理。
从线程的角度,要么执行任务,要么阻塞等待,或者销毁;
从任务的角度,要么马上被执行,要么进入队列等待被执行,或者被拒绝执行。
上图第2步,任务进入workQueue, 如果队列为空且有空闲的Worker的话,可马上得到执行。
关于workQueue,常用的有两个队列:
- LinkedBlockingQueue(capacity):
传入capacity(大于0), 则LinkedBlockingQueue的容量为capacity;
如果不传,默认为Integer.MAX_VALUE,相当于无限容量(不考虑内存因素),多少元素都装不满。
- SynchronousQueue
除非另一个线程试图获取元素,否则不能添加元素。
四、 ExecutorService
为了方便使用,JDK还封装了一些常用的ExecutorService:
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
}
类型 | 最大并发 | 适用场景 |
---|---|---|
newFixedThreadPool | nThreads | 计算密集型任务 |
newSingleThreadExecutor | 1 | 串行执行的任务 |
newCachedThreadPool | Integer.MAX_VALUE | IO密集型任务 |
newScheduledThreadPool | Integer.MAX_VALUE | 定时任务,周期任务 |
newSingleThreadExecutor 其实是 newFixedThreadPool的特例 (nThreads=1),
写日志等任务,比较适合串行执行,一者不会占用太多资源,二者为保证日志有序与完整,同一时间一个线程写入即可。
众多方法中,newCachedThreadPool() 是比较特别的,
1、corePoolSize = 0,
2、maximumPoolSize = Integer.MAX_VALUE,
3、workQueue 为 SynchronousQueue。
结合上一节的分析:
当一个任务提交过来,由于corePoolSize = 0,任务会尝试放入workQueue;
如果没有线程在尝试从workQueue获取任务,offer()会返回false,然后会创建线程执行任务;
如果有空闲线程在等待任务,任务可以放进workQueue,但是放进去后马上就被等待任务的线程取走执行了。
总的来说,就是有空闲线程则交给空闲线程执行,没有则创建线程执行;
SynchronousQueue类型workQueue并不保存任务,只是一个传递者。
所以,最终效果为:所有任务立即调度,无容量限制,无并发限制。
这样的特点比较适合网络请求任务。
OkHttp的异步请求所用线程池与此类似(除了ThreadFactory ,其他参数一模一样)。
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
五、 任务并发的估算
一台设备上,给定一批任务,要想最快时间完成所有任务,并发量应该如何控制?
并发量太小,CPU利用率不高;
并发量太大,CPU 满负荷,但是花在线程切换的时间增加,用于执行任务的时间反而减少。
一些文章提到如下估算公式:
M:并发数;
C:任务占用CPU的时间;
I:等待IO完成的时间(为简化讨论,且只考虑IO);
N:CPU核心数。
代入特定参数验证这条公式:
1、比方说 I 接近于0,则M≈N,一个线程对应一个CPU,刚好满负荷且较少线程切换;
2、假如 I=C,则M = 2N,两个线程对应一个CPU,每个线程一半时间在等待IO,一半时间在计算,也是刚好。
遗憾的是,对于APP而言这条公式并不适用:
-
任务占用CPU时间和IO时间无法估算
APP上的异步任务通常是碎片化的,而不同的任务性质不一样,有的计算耗时多,有的IO耗时多;
然后同样是IO任务,比方说网络请求,IO时间也是不可估计的(受服务器和网速影响)。
-
可用CPU核心可能会变化
有的设备可能会考虑省电或者热量控制而关闭一些核心;
大家经常吐槽的“一核有难,九核围观”映射的就是这种现象。
虽然该公式不能直接套用来求解最大并发,但仍有一些指导意义:
**IO等待时间较多,则需要高的并发,来达到高的吞吐率;
CPU计算部分较多,则需要降低并发,来提高CPU的利用率。**
换言之,就是:
做计算密集型任务时控制并发小一点;
做IO密集型任务时控制并发大一点。
问题来了,小一点是多小,大一点又是多大呢?
说实话这个只能凭经验了,跟“多吃水果”,“加盐少许”一样,看实际情况而定。
比如RxJava就提供了Schedulers.computation()和Schedulers.io(),
前者默认情况下为最大并发为CPU核心数,后者最大并发为Integer.MAX_VALUE(相当于不限制并发)。
可能是作者也不知道多少才合适,所以干脆就不限制了。
这样其实很危险的,JVM对进程有最大线程数限制,超过则会抛OutOfMemoryError。
六、总结
回顾文章的内容,大概有这些点:
- 介绍了线程的生命周期;
- 从线程池的参数入手,分析这些参数是如何影响线程池的运作;
- 列举常用的ExecutorService,介绍其各自特点和适用场景;
- 对并发估算的一些理解。
文章没有对Java线程池做太过深入的探讨,而是从使用的角度讲述基本原理和周边知识;
第二节有结合关键代码作简要分析,也是点到为止,目的在于加深对线程池相关参数的理解,
以便在平时使用线程池的时候合理斟酌,在阅读涉及线程池的开源代码时也能“知其所以然”。