线程池中的线程为什么可以复用?

线程池中的线程为什么可以复用?

线程池底层维护了一个HashMap集合用来存放worker对象,这个worker类实现了runnable接口,代表它是一个可执行的任务,worker类中有两个重要属性:具体工作线程,第一次要执行的任务。

初始化worker类时,它会创建一个线程并将当先对象封装到线程中,也就是说当此线程启动时执行的是此对象的run方法。而worker类中的run方法,他其实调用了runworker()方法,此方法是真正实现线程复用的地方

此方法中用一个while循环用来不断地获得任务,交给当前线程去执行。

底层实现分析

//存放工作线程的集合,线程池的底层实现,是ThreadPollExecutor类中的一个属性
private final HashSet<Worker> workers = new HashSet<Worker>();
worker类

Worker实现了Runnable接口,代表是一个可执行的任务,

  • worker中的两个重要属性
final Thread thread;//具体工作的线程
Runnable firstTask;//第一次要执行的任务

  • worker构造器,初始化时为上面两个属性赋值
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    //getThreadFactory()得到一个线程工厂的方法
    //newThread(this):传入Runnable任务得到一个线程,调用此线程的start()方法时,会调用此类中的run()方法
    this.thread = getThreadFactory().newThread(this);
}
  • worker类中的run()方法
public void run() {
    runWorker(this);
}
  • worker类中的runWorker(this)方法(线程的复用在此处体现)
//此方法中的主要代码
final void runWorker(Worker w) {
Runnable task = w.firstTask;//辅助变量用来存放获取的任务
     while (task != null || (task = getTask()) != null) {//不断地调用getTask()方法获得新的任务
      	try(){
            task.run();//执行到此代码,就完成了一个任务,之后循环获得新的任务再次执行执行。
        }catch (Throwable x) {
            thrown = x; throw new Error(x);
        } 
     }
}
  • worker类中的getTask()方法

    getTask():获取线程任务,通过两种方式从任务队列中获得线程任务,一种是核心线程获取任务,一种是临时线程获取任务。

//getTask()方法中获取任务的核心方法
/**
*workQueue:任务队列
*r = timed:判断此线程是否为核心线程
*/
try {
    Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://临时线程获取任务方法,超时获取就会退出循环,线程被回收
        workQueue.take();//核心线程获取任务方法,获取不到就一直阻塞着
    if (r != null)
        return r;
    timedOut = true;
} catch (InterruptedException retry) {
    timedOut = false;
}
任务队列何时被创建
  • execute():方法实现
//execute()核心代码
int c = ctl.get();
//当前线程工作数与核心线程相比较,小于创建核心线程
if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();
}
//尝试向工作队列中添加任务,添加成功则进入if
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))//对线程池进行一个状态的检查
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
//尝试创建临时线程,失败则进行拒绝策略的调用
else if (!addWorker(command, false))
    reject(command);
}
  • addWork()方法
//创建Work工作任务并入workers队列
for (;;) {
    int wc = workerCountOf(c);//当前工作线程的数量
    if (wc >= CAPACITY ||
        //判断当前(核心)线程数是否大于(核心)最大线程数
        wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
}

try{
     w = new Worker(firstTask);//创建worker,并将任务传给worker,此类中创建工作线程
    final Thread t = w.thread;
    if(t!=null){
        workers.add(worker);//将新创建的worker添加到集合中
        workerAdded = true;//添加成功
        if (workerAdded) {//添加成功
            t.start();//开启线程
            workerStarted = true;
        }
    }
}
上一篇:浅析浏览器跨页面通信的方式:localStorage+StorageEvent事件、BroadCast Channel广播通信、Service Worker消息中转、postMessage、直接引用-


下一篇:解析ThreadPoolExecutor类是如何保证线程池正确运行的