Java 并发:Executor ExecutorService ThreadPoolExecutor

Executor

Executor仅仅是一个简单的接口,其定义如下

public interface Executor {
void execute(Runnable command);
}

作为一个简单的线程池的话,实现这个接口就可以使用了。不过单单这样的话,无法使用Future功能。

ExecutorService

public interface ExecutorService extends Executor {
void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService接口扩展了Executor接口,加入了线程池生命周期的管理,还加入了Future功能。除了提交runnable外还可以使用Callable用于返回结果的任务。这里要注意execute和submit的区别,execute是用于实现Executor接口的,submit则提供了任务的Future机制,submit的实现是基于线程池execute基本功能的。实际上Future机制的大部分代码都在FutureTask这个类里,反倒和线程池关系不大。

AbstractExecutorService

AbstractExecutorService实现了部分invoke系列接口和submit系列接口,它们都依赖子类实现的execute方法。这也说明实现线程池关键是提供管理其生命周期和执行任务的接口,至于submit提供的Future机制可以基于这些很快的实现,比如:

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
} public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

ThreadPoolExecutor

ThreadPoolExecutor是JDK中主要线程池的一个实现,提供了多种不同的构造函数可以依据参数得到不同特性的线程池,它对于的工厂方法类为Executors

Java 并发:Executor ExecutorService ThreadPoolExecutor

组成成分

ThreadPoolExecutor从其工厂函数就可以大致看出各个组成成分,具体如下

任务队列

通过execute提交的任务(submit操作最后也通过execute进行任务执行),会有可能先进入任务队列而不是立即被线程池执行。这依赖于当前的线程池状态和设定的参数,如果当前创建的线程数尚未达到corePoolSize那么会立即创建一个线程,否则则会尝试加入队列之中。

线程集合

作为一个线程池,它肯定需要创建线程,并保存这些线程的状态信息。因为线程池内的线程是专门用来运行提交的Runnable活着Callable任务的,他们除了维护状态信息外基本不会为自己干点什么,一般这样的线程叫做worker,或工作者线程。在内部使用HashSet保存这些线程对象。

线程工厂

可以按照需要定制thread对象,比如设置线程池内线程名称,调整daemon属性等。

拒绝策略

如果线程池处理数量达到上限(队列已满且已有线程数达到maximumPoolSize)则开始拒绝任务,相当于提供了一个钩子函数

池内线程

线程池内的线程除了运行用户提交的任务外,还需要维护自己的一些状态信息。这JDK的实现中工作者线程运行逻辑用一个实现了Runnable接口并继承了AbstractQueuedSynchronizer的一个类Worker来表示。它没有继承Thread类,而只是实现了Runnable接口,具体创建线程的过程交给了用户可以自己定制的ThreadFactory线程工厂。

        Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

线程生成

线程的生成均由addWorker这个函数进行,从线程池创建后,线程生成主要发生在几种情况下:

  • 提交了一个任务并且当前线程数量小于corePoolSize(不管是否有空闲着的线程)
  • 提交了一个任务并且(a.没有线程空闲 & b.任务队列无法添加任务 & c.任务数小于设定的最大值)
  • 调用了prestartAllCoreThreads,由于在创建后生成所有corePoolSize数量的线程
  • 调用了prestartCoreThread,用于在创建后生成一个线程,如果数量已经达到corePoolSize则忽略

后面两个方法可以方便的来预热线程池。如上述给出的Worker构造函数可知它又一个参数叫做firstTask,这是因为一般情况下线程的创建都是因为有任务提交引起的(也就是说一个线程池创建后并不会马上产生指定池大小数量的线程),firstTask是该Worker线程第一个运行的任务。当Worker线程运行完第一个任务后,它获取新任务的方式就发生了改变,它会阻塞在任务队列上,等待新任务的到来,firstTask基本就不再使用了。

为什么要采取这样的方式?如果线程池是一个固定大小的,一创建后立即生成所有工作者线程的这样的一种实现,就完全可以把任务放到队列中,所有的Worker线程都从队列里获取要执行的任务。但JDK里实现支持动态的添加工作者线程,新创建的线程总是运行刚刚使得它被创建的那个任务提交。如果放到队列里的话还要进行等待其他的任务先被执行。不过这么说也有些牵强,也未必后到得任务就更重要,反而让前面排队的任务等着。

线程运行

Worker对象实现了runnable接口由ThreadFactory给出的Thread对象负责正真的执行(Thread.start),然后再Worker的run方法中会去执行它接收到得任务,

        public void run() {
runWorker(this);
}

关键过程如runWorker函数:

    final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
  • task = getTask()就是从任务队列取任务的过程,如果没有任务的话会阻塞着,线程是处于WAITING状态,也即空闲状态
  • task.run()即运行提交的任务
  • w.completedTasks++每个worker都会统计自己运行的任务数,通过线程池可以获得一个大概的总数,之所以是大概因为执行的任务时刻会完成,也没必要用锁去保证这个数字。
  • beforeExecuteafterExecute是线程池的钩子函数可以被子类覆盖用于实现一些统计功能,但要注意这些是会被线程池内不同线程执行,所以一般要用到threadlocal机制。

线程状态

为什么Worker要继承AbstractQueuedSynchronizer,因为它要维护自己的一个状态变更过程,而且是要支持等待的,其实用一般的lock也可以,不过可能doug lea觉得没必要再隔一层吧(lock也是用AQS实现的)。状态值有下面几个:

  • -1 Worker对象被创建但还没有对应的Thread开始运行它,初始化时设置
  • 0 已经有对应的Thread运行Worker的run方法,但没有在运行Worker接收的任务内容,worker.unlock(),表示当前线程空闲
  • 1 正在运行任务,worker.lock()

    实际上用到worker.lock/tryLock的地方并不多,一个是在runWorker内部,一个就在interruptIdleWorkers这里。这里不得不提下shutdown方法它负责把线程池关掉,但是并不是很暴力,只是让队列停止接收任务,而让已经执行的任务继续直到所有已提交任务被完成。所以在这里要有选择的interrupt线程,即选择那些处于idle状态的线程。而idle状态的线程按照前面的设定就是状态值为0/-1,即可以获得lock的那些线程。
    private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

主要过程

任务提交

任务提交会导致新德工作者线程生成

    public void execute(Runnable command) {
if (command == null)
throw new NullPointerException(); int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

线程退出

工作线程的退出有一下几个原因:

0. 线程抛出异常

  1. 线程池的shutdown方法被调用
  2. 线程池的shutdownNow方法被调用
  3. 线程池当前线程数超过了corePoolSize并且获取任务等待时长超过keepAliveTime
  4. 线程池设定了allowCoreThreadTimeOut = true,并且获取任务等待时长超过keepAliveTime

任务异常

当工作线程执行的任务抛出异常时,工作者线程会退出。当时在完全退出前会执行processWorkerExit(w, completedAbruptly);,有异常抛出时completedAbruptlytrue,所以在该函数中如果发现当前工作者线程是因为异常而退出的会尝试着再次执行一个addWorker调用来补上这个要退出的线程。

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
} tryTerminate(); int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

可以发现如果此时线程池要关闭或者线程数量已经超过当前条件的最小值则不进行线程补充。这个最小值的产生很微妙。

线程池收缩

通过设定合适的keepAliveTime可以让线程池多余corePoolSize的线程在一定时间后主动退出,实现线程池的动态收缩,如果设定了allowCoreThreadTimeOut = true的话连core线程也可以自动退出,直到一个线程都没有,从getTask观察得到:

    private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out? retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
} boolean timed; // Are workers subject to culling? for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize; if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
} try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

可以发现当需要自动收缩时通过带有超时参数的poll函数去取得任务队列(workQueue)内的任务,而一般情况下则使用take调用无限阻塞。

通过返回一个null值可以使得runWorker中的循环退出转而执行processWorkerExit,注意在Worker线程完全退出前已经通过compareAndDecrementWorkerCount将当前Worker线程的数量给减少了,因为直到收到null后的工作线程循环肯定会马上退出不再处理后续任务了,这也是为什么在前面processWorkerExit函数内要选择性的进行计数减的原因。

shutdown

如前面提到的shutdown是一种温和的关闭线程池的方式,它不会去interrupt已在运行任务的线程。

    public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

它使用了interruptIdleWorkers去interrupt那些处于idle状态的工作者线程。一旦线程中的任务响应了interrupt请求或主动退出或抛出InterruptedException都会使得工作者线程退出执行processWorkerExit方法并进而调用tryTerminate,使其在平时调用processWorkerExit时也会执行tryTerminate不过不必慌张,因为处于运行状态的线程池后缀不做什么立即返回。

    final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
} final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

(workerCountOf(c) != 0)时即还有任务运行时都会尝试着去interrupt其中的空闲工作者线程(而这些线程退出又会执行tryTerminate方法,形成一个链式的传递)。而那些正在执行任务的工作者线程,虽然现在不能去中断他们,但在在完成任务后它们会发现线程池已经处于要关闭的状态也会主动退出。当所有的工作者线程都退出时执行termination.signalAll();唤醒在termination条件队列上等的线程。一般是通过调用awaitTermination方法等待线程池完全退出。

上一篇:ExecutorService与Executors例子的简单剖析


下一篇:class用法