ThreadPoolExecutor线程池源码
一、基础知识
java 中的& 和 | 运算
& 是所有的2进制位数“与”出的最终结果,“与”的规则是两者都为1时才得1,否则就得0
-
| 是所有的2进制位数“或”出的最终结果,“或”的规则是两者之一有一个1就得1,否则就得0
- 是所有的2进制位数“非”出的最终结果,如果位为0,结果是1,如果位为1,结果是0.
~(A) = -(A+1)
位运算
例如:1 << 2 向左移2位
0001 -> 0100 = 4
Integer.SIZE 为何是32?
Integer 值的大小范围为 -231~231-1,int类型数据占4字节,1一个 字节8个bit位。
-1的二进制表示
先取1的原码:00000000 00000000 00000000 00000001
得反码: 11111111 11111111 11111111 11111110
得补码: 11111111 11111111 11111111 11111111
二、线程池使用
newFixedThreadPool
newSingleThreadExecutor
newCachedThreadPool
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
/**
参数:
corePoolSize : 核心线程数
maximumPoolSize:最大线程数
keepAliveTime:非核心线程数最大的空闲时间
TimeUnit:时间单位
workQueue:工作队列
1. 同步队列 SynchronousQueue 长度为0
2. 有界队列 ArrayBlockingQueue 长度自己指定
3. *队列 LinkedBlockingQueue *
threadFactory:线程工厂,用来创建线程池中的线程的,可以自己实现这个线程工厂。
handler:拒绝策略,可以自己实现拒绝策略。
jdk自身实现了4种:
(1)ThreadPoolExecutor.AbortPolicy 丢弃任务,并抛出 RejectedExecutionException 异常。默认
(2)ThreadPoolExecutor.CallerRunsPolicy:该任务被线程池拒绝,由调用线程执行该任务。
(3)ThreadPoolExecutor.DiscardOldestPolicy : 抛弃队列最前面的任务,然后重新尝试执行任务。
(4)ThreadPoolExecutor.DiscardPolicy,丢弃任务,不抛出异常。
注:当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略。
**/
几种常用写法(入口调用不同方法,底层都是调用的ThreadPoolExecutor的execute方法)
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 100, 60,
TimeUnit.MICROSECONDS, new SynchronousQueue<>(), new MyThreadFactory(), new MyRejectedExecutionHandler());
try {
// for (int i = 1; i < 100; i++) {
// threadPoolExecutor.execute(new MyTask(i));
// }
//
// for (int i = 1; i < 100; i++) {
// threadPoolExecutor.submit(new MyCallable(i));
// }
List<MyCallable> tasks = new ArrayList<MyCallable>();
for (int i = 1; i <= 105; i++) {
tasks.add(new MyCallable(i));
}
threadPoolExecutor.invokeAll(tasks);
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
}
class MyTask extends Thread {
private int i;
public MyTask(int i) {
this.i = i;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "=" + i);
}
}
class MyCallable implements Callable<Object> {
private int i;
public MyCallable(int i) {
this.i = i;
}
@Override
public Object call() throws Exception {
System.out.println("Callable" + Thread.currentThread().getName() + "=" + i);
return i;
}
}
// 自定义线程工厂
class MyThreadFactory implements ThreadFactory{
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private final Long stackSize;
MyThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "myPool" +
poolNumber.getAndIncrement() +
"-thread-";
stackSize = 1024L;
}
/**
* 创建线程时指定线程所在组,线程预期栈大小,优先级,是否为守护线程,名字等。
*
* @param r
* @return
*/
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
stackSize);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
// 自定义任务拒绝策略
class MyRejectedExecutionHandler implements RejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("任务被拒绝");
}
}
任务添加顺序:
核心线程–》工作队列–》非核心线程。见源码解析
任务执行顺序:
核心线程–》非核心线程–》工作队列。见源码解析
三、源码分析
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // ctl = running
private static final int COUNT_BITS = Integer.SIZE - 3; //COUNT_BITS = 29;
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //0010 - 1= 2^29-1
// runState is stored in the high-order bits 线程池运行状态存储在高3位,剩下的低29位用来记录当前工作线程数量,高明之处就是一个int类型的值,去记录线程池中几个维度的状态和数据
// 能接受新任务以及处理已添加的任务
private static final int RUNNING = -1 << COUNT_BITS; // 1110
// 不接受新任务,可以处理已经添加的任务,调用shutdown方法时
private static final int SHUTDOWN = 0 << COUNT_BITS; // 0000
// 不接受新任务,不处理已经添加的任务,并且中断正在处理的任务 调用shutdownnow方法时
private static final int STOP = 1 << COUNT_BITS; // 0010
// 所有的任务已经终止,ctl记录的“任务数量” 为0 ,ctl负责记录线程池的运行状态与活动线程数量
private static final int TIDYING = 2 << COUNT_BITS; // 0100
// 线程池彻底终止,则线程池转变为terminated 状态
private static final int TERMINATED = 3 << COUNT_BITS; // 0110
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 获取ctl的值,初始值是running的值
int c = ctl.get();
// 获取到工作线程的数量 和 核心线程数进行比较,小于核心线程数,则增加ctl的值+1
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 线程池是不是running状态,同时往队列里面添加任务是否成功
if (isRunning(c) && workQueue.offer(command)) {
//
int recheck = ctl.get();
// 再次判断是不是running状态,不是的话,直接移除任务
if (! isRunning(recheck) && remove(command))
// 拒绝任务
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 核心线程已满 队列已满,就走下面
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
// 除了加入的队列的任务之外,其他的任务进来都会把ctl这个值加1,代表线程池的工作线程数加1
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 池中线程启动的地方
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 这里体现了线程池中线程的复用,这里就有一个问题,这里和调addWorker的地方是异步的,所以这里可能是核心线程触发的,但是会读到后续添加到队列中的任务 6
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 我们传递的任务被执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null; ******
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 是否异常结束的
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
// 减少线程池中工作线程数量
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 非原子操作,固加锁
completedTaskCount += w.completedTasks;
// 从线程池中移除当前工作线程
workers.remove(w);
} finally {
// 解锁
mainLock.unlock();
}
// 尝试中断线程池中的线程,假如当前线程池不处于running状态
tryTerminate();
// 获取ctl值
int c = ctl.get();
// 判断线程池的状态是不是小于stop的 就是 running,shutdown ,假如上一步将线程池的状态改为了TIDYING 或者 TERMINATED 状态下面就不会走了
if (runStateLessThan(c, STOP)) {
// 很显然这里是true
if (!completedAbruptly) {
// min = corePoolSize 一般不会0,看自己设置的是多大
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 假如为0,同时工作队列不是空的,那么min = 1;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果线程池的工作线程 大于或等于了 min (我觉得就可以理解为核心线程数) 直接返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false); // ?
}
}
// 这个方法 只要return null, 这个工作线程就会死掉,同时线程池中的工作线程数就减一
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// The pool is stopped. The pool is shutdown and the queue is empty.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 核心线程是不是开启了超时设置 工作线程是不是大于核心线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// There are more than maximumPoolSize workers (due to a call to setMaximumPoolSize).
if ((wc > maximumPoolSize || (timed && timedOut)) // 7
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 1 timed 为true 时,一般来说就是工作线程大于了核心线程,非核心线程就需要就等待最长空闲时间,2.timed为false时,只有核心线程,然后就一直wait 主线程往队列里面添加任务,一有任务就开始处理,直到shutdown
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take(); // 8
// poll 是等待这个时间,没有的话就返回null,take 是 阻塞
if (r != null)
return r;
// 当r为null时,代表非核心线程超时了,再次循环,7的地方,就直接返回null了
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
// 由于每次都是核心线程最开始执行,非核心线程后开始执行,所以当池中只有核心线程时,timed 就是 false , 当池中有非核心线程时,timed 就是 true ,所以上面 8处就是来处理空闲的非核心线程的
总结:我们传递给线程池的任务task,最后在worker(线程池中的线程)的run方法中被执行,而且是以task.run()的方法进行直接调用的。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
疑问:
-
核心线程和非核心线程都是用ctl这个值来维护的,
是的,都是由ctl这个值来维护。
-
线程池如何区分池中的线程是核心的还是非核心的呢?没有看到worker携带相关标识
不用区分,在没有设置核心线程超时的情况时,非核心线程是优先被清理的。
-
workers的数量何时减少,我们是设置了一个空闲等待时间,啥时候触发?
由于每次都是核心线程最开始执行,非核心线程后开始执行,所以当池中只有核心线程时,timed 就是 false , 当池中有非核心线程时,timed 就是 true ,所以上面 8处就是来处理空闲的非核心线程的
-
什么时候从队列中获取任务并执行的?
当工作线程(包括核心非核心)执行完首次分配的任务后,都会从队列中获取任务
-
线程的复用在哪里体现了?
第6处
-
为何设置核心线程数可以超时时,当核心线程空闲时间超过空闲时间时,线程池会自动退出?
因为线程池的运行状态 也是由内部的工作线程维持的,当线程池中没有工作线程是,自然就停止了。
源码中所用设计模式:
- 策略模式:线程工厂和拒绝策略;
- 适配器模式:public class ThreadPoolExecutor extends abstract class AbstractExecutorService implements ExecutorService 。抽象类中实现了接口部分方法,具体类中也实现了接口的部分方法,典型的接口适配器模式。