int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 线程池状态 >= SHUTDOWN
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 内层自旋
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
// 工作中的线程数大于线程池的容量,或者已经大于等于核心线程数,或者大于等于最大线程数
// core为true,表示要创建核心线程,false表示要创建非核心线程
// 为什么大于等核心线程数的时候要返回false,因为要添加到缓冲队列,或者创建非核心线程来执行,不能创建核心线程了
return false;
if (compareAndIncrementWorkerCount(c))
// 以CAS的方式尝试把线程数加1
// 注意这里只是把线程池中的线程数加1,并没有在线程池中真正的创建线程
// 成功后跳出内层自旋
break retry;
// CAS失败,再次获取ctl,检查线程池状态
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
// 线程池状态被改变了,从外层自旋开始再次执行之前的逻辑
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 可以看到两层自旋 + CAS,仅仅是为了把线程池中的线程数加1,还没有新建线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 把task包装成Worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 获取锁之后,再次检查线程池的状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
// 检查线程状态
throw new IllegalThreadStateException();
// 添加到worders
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
// 维护largestPoolSize变量
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 解锁
mainLock.unlock();
}
if (workerAdded) {
// 添加成功
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 执行worker的线程启动失败
addWorkerFailed(w);
}
return workerStarted;
}
可以看到`addWorker`方法前一部分,用了外层**自旋**判断线程池的状态,内层**自旋 + CAS**给线程池中的线程数加1。后半部分用了`ReentrantLock`保证创建`Worker`对象,以及启动线程的线程安全。一个方法中三次获取了线程池的状态(不包含该方法调用的其他方法),因为每两次之间,线程池的状态都有可能被改变。
### runWorker
前文在介绍`Worker`内部类时说过,`Worker`会把自己传递给`ThreadFactory`创建的线程执行,最终执行`Worker`的`run`方法,而`Worker`类的`run`方法只有一行代码:
runWorker(this);
所以接下来看看`runWorker`方法是如何实现了
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 允许外部中断
w.unlock(); // allow interrupts
// 记录worker是不是异常退出的
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// 自旋,如果task不为空,或者能从缓冲队列(阻塞队列)中获取任务就继续执行,不能就一直阻塞
// 加锁
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 如果线程池正在停止,并且当前线程没有被中断,就中断当前线程
wt.interrupt();
try {
// 钩子函数,处理task执行前的逻辑
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 钩子函数,处理task执行后的逻辑
afterExecute(task, thrown);
}
} finally {
task = null;
// 完成的任务数加1
w.completedTasks++;
// 解锁
w.unlock();
}
}
// 运行到这里,说明worker没有异常退出
completedAbruptly = false;
} finally {
// 自旋操作被打断了,说明线程需要被回收
processWorkerExit(w, completedAbruptly);
}
}
第10行代码中,task为null时,会通过`getTask()`方法从缓冲队列中取任务,因为缓冲队列是阻塞队列,所以如果获取不到任务会一直被阻塞,接下来看看`getTask`方法的内部实现
### getTask
`getTask`用于**阻塞**式的从缓冲队列中获取任务。
private Runnable getTask() {
// 线程是否超时
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// 自旋
// 获取线程池状态
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 线程池终止了,或者线程池停止了,且缓冲队列中没有任务了
// 自旋 + CAS方式减少线程计数
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 根据allowCoreThreadTimeOut参数来判断,要不要给核心线程设置等待超时时间
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 当前线程数大于了maximumPoolSize(因为maximumPoolSize可以动态修改)或者当前线程设置了超时时间且已经超时了
// 且线程数大于1或者缓冲队列为空
// 这个条件的意思就是:当前线程需要被回收
if (compareAndDecrementWorkerCount(c))
// 返回null后,上层runWorker方法中断循环,执行processWorkerExit方法回收线程
return null;
continue;
}
try {
// 从阻塞队列中获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
// 成功获取任务
return r;
// 没有获取到任务,超时
timedOut = true;
} catch (InterruptedException retry) {
// 线程被中断,重试
timedOut = false;
}
}
}
理解该方法的前提,是要理解**阻塞队列**提供的阻塞式API。
这个方法重点关注两点:
* 从缓冲队列取任务时,`poll`非阻塞,`take`阻塞,调用哪个由当前线程需不需要被回收来决定
* 该方法返回null之后,上层方法会回收**当前线程**
除了这几个核心方法之外,往线程池提交任务还有一个方法叫`submit`
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public Future submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public Future submit(Callable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
`submit`方法可以接收线程池返回的结果,也就是`Futrue`对象,可以接收`Runnable`对象和`Callable`对象。
至于`Future`、`FutureTask`、`Runnable`、`Callable`之间的关系此处不再赘述。
至此`ThreadPoolExecutor`的核心方法的源码以及执行逻辑已经讲解完毕,再来看一些非核心方法,了解一下即可
* `public void shutdown()`:关闭线程池,已经提交过的任务还会执行(线程池中未运行完毕的,缓冲队列中的)
* `public List<Runnable> shutdownNow()`:停止线程池,试图停止正在执行的任务,暂停缓冲队列中的任务,并且返回
* `public void allowCoreThreadTimeOut(boolean value)`:设置**核心线程**是否允许回收
* `protected void beforeExecute(Thread t, Runnable r)`:钩子函数,处理线程执行任务前的逻辑,这里是**空实现**
* `protected void afterExecute(Runnable r, Throwable t)`:钩子函数,处理线程执行任务后的逻辑,这里是**空实现**
* `public int getActiveCount()`:返回正在执行任务的线程的**大致数量**
* `public long getCompletedTaskCount()`:返回执行完成的任务的**大致数量**
除此之外还需要了解的是,构造方法中的七个参数,除了`BlockingQueue`是不能动态设置外,其余六个参数都可以动态设置,分别调用对于的`setXxx`方法即可,当然也可以通过对于的`getXxx`方法获取对应的信息。
鉴于此,我们再来看一个常见的问题
> Java有几种线程池?
JDK(准确的说是`java.util.concurrent.Executors`工具类)提供了四种线程池:
* `CachedThreadPool`:缓冲线程池
```
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
```
* `FixedThreadPool`:固定线程数的线程池
```
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
```
* `SingleThreadExecutor`:单线程的线程池
```
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
}
```
* `ScheduledThreadPool`:可定时调度的线程池
```
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
// ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以super()还是调用ThreadPoolExecutor的构造方法
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
}
```
仔细看下这四种线程池,最终都调用了ThreadPoolExecutor的构造方法,只是传递的参数有所不同。
* `CachedThreadPool`和`ScheculedThreadPool`设置的最大线程数都是`Integer.MAX_VALUE`,可能线程数过多而产生OOM
* `SingleThreadExecutor`和`FixedThreadPool`使用的都是*队列,最大元素个数为`Integer.MAX_VALUE`,可能缓冲队列中堆积的任务过多,而产生OOM
这两点正是**阿里巴巴代码规范**里禁止使用这四种线程池的原因。
想要使用线程池,必须通过`ThreadPoolExecutor`的方法来**创建线程池**。