线程池中的线程为什么可以复用?
线程池底层维护了一个HashMap集合用来存放worker对象,这个worker类实现了runnable接口,代表它是一个可执行的任务,worker类中有两个重要属性:具体工作线程,第一次要执行的任务。
初始化worker类时,它会创建一个线程并将当先对象封装到线程中,也就是说当此线程启动时执行的是此对象的run方法。而worker类中的run方法,他其实调用了runworker()方法,此方法是真正实现线程复用的地方
此方法中用一个while循环用来不断地获得任务,交给当前线程去执行。
底层实现分析
//存放工作线程的集合,线程池的底层实现,是ThreadPollExecutor类中的一个属性
private final HashSet<Worker> workers = new HashSet<Worker>();
worker类
Worker实现了Runnable接口,代表是一个可执行的任务,
- worker中的两个重要属性
final Thread thread;//具体工作的线程
Runnable firstTask;//第一次要执行的任务
- worker构造器,初始化时为上面两个属性赋值
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//getThreadFactory()得到一个线程工厂的方法
//newThread(this):传入Runnable任务得到一个线程,调用此线程的start()方法时,会调用此类中的run()方法
this.thread = getThreadFactory().newThread(this);
}
- worker类中的run()方法
public void run() {
runWorker(this);
}
- worker类中的runWorker(this)方法(线程的复用在此处体现)
//此方法中的主要代码
final void runWorker(Worker w) {
Runnable task = w.firstTask;//辅助变量用来存放获取的任务
while (task != null || (task = getTask()) != null) {//不断地调用getTask()方法获得新的任务
try(){
task.run();//执行到此代码,就完成了一个任务,之后循环获得新的任务再次执行执行。
}catch (Throwable x) {
thrown = x; throw new Error(x);
}
}
}
-
worker类中的getTask()方法
getTask():获取线程任务,通过两种方式从任务队列中获得线程任务,一种是核心线程获取任务,一种是临时线程获取任务。
//getTask()方法中获取任务的核心方法
/**
*workQueue:任务队列
*r = timed:判断此线程是否为核心线程
*/
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://临时线程获取任务方法,超时获取就会退出循环,线程被回收
workQueue.take();//核心线程获取任务方法,获取不到就一直阻塞着
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
任务队列何时被创建
- execute():方法实现
//execute()核心代码
int c = ctl.get();
//当前线程工作数与核心线程相比较,小于创建核心线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//尝试向工作队列中添加任务,添加成功则进入if
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//对线程池进行一个状态的检查
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//尝试创建临时线程,失败则进行拒绝策略的调用
else if (!addWorker(command, false))
reject(command);
}
- addWork()方法
//创建Work工作任务并入workers队列
for (;;) {
int wc = workerCountOf(c);//当前工作线程的数量
if (wc >= CAPACITY ||
//判断当前(核心)线程数是否大于(核心)最大线程数
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
}
try{
w = new Worker(firstTask);//创建worker,并将任务传给worker,此类中创建工作线程
final Thread t = w.thread;
if(t!=null){
workers.add(worker);//将新创建的worker添加到集合中
workerAdded = true;//添加成功
if (workerAdded) {//添加成功
t.start();//开启线程
workerStarted = true;
}
}
}