ThreadPoolExecutor类

首先分析内部类:ThreadPoolExecutor$Worker

//Worker对线程和任务做了一个封装,同时它又实现了Runnable接口,
//所以Worker类的线程跑的是自身的run方法
private final class Worker
extends AbstractQueuedSynchronizer implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks; /**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//创建一个Thread对象,它的Runnable对象是当前Worker对象
//创建了线程,但是还没启动,在外部start
//Executors.DefaultThreadFactory
this.thread = getThreadFactory().newThread(this);
} /** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
} // Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state. protected boolean isHeldExclusively() {
return getState() != 0;
} protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
} protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
} public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
} final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//调用pool.execute()时传入任务时,如果addWorker返回为true,表示创建了worker,则任务也放在worker对象中了。
//如果addWorker返回为false,则把任务放入队列
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//第二个task是从队列中取得的
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
//任务执行完后,置空
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
} private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out? retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
} boolean timed; // Are workers subject to culling? for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize; if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
} try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

接着分析ThreadPoolExecutor

public class ThreadPoolExecutor extends AbstractExecutorService {
//状态变量,保存了workerCount和runState的值
//线程池的初始状态是RUNNING
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; } //状态值从小到大排列
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS; // 默认为 false,当线程池中已经有了 corePoolSize 个线程,即使这些线程不干活,也不会回收。
// 但是如果线程池中的线程数量超过了 corePoolSize,则会回收
private volatile boolean allowCoreThreadTimeOut; private volatile int corePoolSize; private volatile int maximumPoolSize; public void execute(Runnable command) {
if (command == null)
throw new NullPointerException(); int c = ctl.get();
//worker数量小于最小线程数,创建一个worker,并启动
//如果addWorker返回true,表示创建了一个worker对象,任务也放在worker对象中了。
//如果addWorker返回false,则随后把任务放入队列
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果线程池处于运行状态,往队列投任务
//workQueue.offer(command)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果workQueue.offer(command)返回false呢?
//当队列中积压的任务太多时,就会返回false
//这时传给addWorker的是false
else if (!addWorker(command, false))
reject(command);
} //core决定worker数量以corePoolSize和maximumPoolSize中哪一个值为上限
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false; for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
} boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c); if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
} }

线程池接收任务的流程图:

ThreadPoolExecutor类

关闭线程池有 shutdown 和 shutdownNow 2种方法:

shutdown 不再接收新任务,但会把队列中的任务执行完,shutdownNow 不会执行队列中的任务。

public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
} public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
// 删除队列中的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

shutdown 方法置线程池状态为 SHUTDOWN,shutdownNow 方法置为 STOP,线程池的线程一旦启动,会不停地从队列中取任务

getTask 的部分逻辑

int c = ctl.get();
// 获取线程池的状态
int rs = runStateOf(c); // Check if queue empty only if necessary.
// 状态如果为 SHUTDOWN,则当队列没有任务时,返回 null,即线程执行完 run 方法,执行结束;如果队列不空,则继续执行队列中的任务
// 状态如果为 STOP,则直接返回 null,不管队列是否有任务
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

但是,shutdown 和 shutdownNow 一定会关闭线程池吗?这两个方法均是设置了状态,interrupt 了 worker,但是如果 worker 的 run 方法是一个死循环,而且它不关心这个 interrupt 标志位的话,那么线程是无法关闭的。当然正常的业务逻辑中,不会有这种情况。使用了 shutdown 后,因为这只是置标志,所以需要调用 awaitTermination 等线程池真正关闭或者超时。

假如一个 jvm 进程,有一个前台线程,多个 daemon 线程,当前台线程退出后,jvm 进程退出。

线程池创建线程时,daemon 属性默认为 false,即默认前台线程。

// java.util.concurrent.Executors.DefaultThreadFactory#newThread
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}

Thread 对象初始化时跟随当前线程的 daemon 属性


Thread parent = currentThread();

this.daemon = parent.isDaemon();
上一篇:sql 多行合一行


下一篇:sql多行转一行,以逗号隔开