看看
getTask()
// 此方法有三种可能 // 1. 阻塞直到获取到任务返回。默认 corePoolSize 之内的线程是不会被回收的,它们会一直等待任务 // 2. 超时退出。keepAliveTime 起作用的时候,也就是如果这么多时间内都没有任务,那么应该执行关闭 // 3. 如果发生了以下条件,须返回 null // 池中有大于 maximumPoolSize 个 workers 存在(通过调用 setMaximumPoolSize 进行设置) // 线程池处于 SHUTDOWN,而且 workQueue 是空的,前面说了,这种不再接受新的任务 // 线程池处于 STOP,不仅不接受新的线程,连 workQueue 中的线程也不再执行 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { // 允许核心线程数内的线程回收,或当前线程数超过了核心线程数,那么有可能发生超时关闭 // 这里 break,是为了不往下执行后一个 if (compareAndDecrementWorkerCount(c)) // 两个 if 一起看:如果当前线程数 wc > maximumPoolSize,或者超时,都返回 null // 那这里的问题来了,wc > maximumPoolSize 的情况,为什么要返回 null? // 换句话说,返回 null 意味着关闭线程。 // 那是因为有可能开发者调用了 setMaximumPoolSize 将线程池的 maximumPoolSize 调小了 // 如果此 worker 发生了中断,采取的方案是重试 // 解释下为什么会发生中断,这个读者要去看 setMaximumPoolSize 方法, // 如果开发者将 maximumPoolSize 调小了,导致其小于当前的 workers 数量, // 那么意味着超出的部分线程要被关闭。重新进入 for 循环,自然会有部分线程会返回 null int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // CAS 操作,减少工作线程数 decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { // 如果此 worker 发生了中断,采取的方案是重试 // 解释下为什么会发生中断,这个读者要去看 setMaximumPoolSize 方法, // 如果开发者将 maximumPoolSize 调小了,导致其小于当前的 workers 数量, // 那么意味着超出的部分线程要被关闭。重新进入 for 循环,自然会有部分线程会返回 null timedOut = false; } } }
到这里,基本上也说完了整个流程,回到 execute(Runnable command) 方法,看看各个分支,我把代码贴过来一下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //表示 “线程池状态” 和 “线程数” 的整数 int c = ctl.get(); // 如果当前线程数少于核心线程数,直接添加一个 worker 执行任务, // 创建一个新的线程,并把当前任务 command 作为这个线程的第一个任务(firstTask) if (workerCountOf(c) < corePoolSize) { // 添加任务成功,即结束 // 执行的结果,会包装到 FutureTask // 返回 false 代表线程池不允许提交任务 if (addWorker(command, true)) return; c = ctl.get(); } // 到这说明,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败 // 如果线程池处于 RUNNING ,把这个任务添加到任务队列 workQueue 中 if (isRunning(c) && workQueue.offer(command)) { /* 若任务进入 workQueue,我们是否需要开启新的线程 * 线程数在 [0, corePoolSize) 是无条件开启新线程的 * 若线程数已经大于等于 corePoolSize,则将任务添加到队列中,然后进到这里 */ int recheck = ctl.get(); // 若线程池不处于 RUNNING ,则移除已经入队的这个任务,并且执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); // 若线程池还是 RUNNING ,且线程数为 0,则开启新的线程 // 这块代码的真正意图:担心任务提交到队列中了,但是线程都关闭了 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 若 workQueue 满,到该分支 // 以 maximumPoolSize 为界创建新 worker, // 若失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略 else if (!addWorker(command, false)) reject(command); }
工作线程:线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会循环获取工作队列里的任务来执行.我们可以从Worker类的run()方法里看到这点
public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //先加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
线程池中的线程执行任务分两种情况
- 在execute()方法中创建一个线程时,会让这个线程执行当前任务
- 这个线程执行完上图中 1 的任务后,会反复从BlockingQueue获取任务来执行