ThreadPoolExecutor线程池初探

1 ThreadPoolExecutor线程池初探

1.1 线程池的创建

  • ThreadPoolExecutor

基于Executors创建线程池的方法就不在赘述,本文主要介绍通过ThreadPoolExecutor对象创建线程池

ThreadPoolExecutor的参数共有七个

  1. corePoolSize – 线程池中的核心线程数, 除非设置allowCoreThreadTimeOut(后续会介绍改方法),否则核心线将会一直保留在线程池中,
  2. maximumPoolSize – 线程池中的最大线程个数
  3. keepAliveTime – 超过corePoolSize的线程的最大空闲时间
  4. unit – keepAliveTime的时间单位
  5. workQueue – 用来保存提交到线程池的任务
  6. threadFactory – 线程池工厂
  7. handler – 当maximumPoolSize都在执行任务,workQueue也被已提交任务填满时,多余任务提交到Pool的拒绝策略

值得注意的是,超过corePoolSize的任务被提交到Pool中时,并不会立即创建线程来执行,而是会先将任务防止的workQueue中,当workQueue被任务填满后,才会创建线程来执行新提交的任务,知道线程数达到maximunPoolSize。

ThreadPoolExecutor线程池初探

如下代码所示

向线程池中提交6个任务(T1~T6)时,线程池的线程数一直为1,超过coreSize的任务会在workQueue中等待
但当第7个任务提交时,由于coreSize线程都在执行任务,且wokrQueue已经满了,线程池会创建新的线程(不大于最大线程数的情况下)来执行T7任务

public static void main(String[] args){
        System.out.println("begin");
        ThreadPoolExecutor pool=new ThreadPoolExecutor(1,2,30L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(5),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        for (int i = 0; i < 6; i++) {
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1*1000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("执行当前任务的线程名称:"+Thread.currentThread().getName());
                }
            });
        }

         pool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(3*1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("第七个任务:"+Thread.currentThread().getName());
            }
        });

        System.out.println("当前线程池中运行的线程个数:"+pool.getActiveCount());
        pool.shutdown();
        System.out.println("end");
    }

可以看到上面的代码运行结果如下,最后提交的任务反而优先执行(pool.getActiveCount() 并不一定能获取准确的运行数,这个后续在介绍)

begin
当前线程池中运行的线程个数:2
end
执行当前任务的线程名称:pool-1-thread-1
不在阻塞被执行:pool-1-thread-2
执行当前任务的线程名称:pool-1-thread-1
执行当前任务的线程名称:pool-1-thread-2
执行当前任务的线程名称:pool-1-thread-1
执行当前任务的线程名称:pool-1-thread-2
执行当前任务的线程名称:pool-1-thread-1

从ThreadPoolExecutor#execute也可以看出如下逻辑

  int c = ctl.get();
  //当前运行worker数如果小于核心线程数
  if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
    }
  //pool的状态为RUNNING,则入队
  if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //再次检查,并发操作可能pool已经处于非RUNNING状态
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果work为0,新增work,防止workQueue中的task一直无法被执行
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
   } 
   //入队失败,则尝试创建woeker执行,创建失败执行拒绝策略
   else if (!addWorker(command, false))
            reject(command);        
          

1.2 线程池如何从workQueue获取Task

线程中用来执行任务的其实是ThreadPoolExecutor的一个私有内部类Worker实现了AQS和Runnable接口
它内部通过ThreadFactory持有一个Thread

调用worker的start方法实际执行的是runWorker方法,该方法中会从阻塞队列中不断获取task来执行
同时线程的空闲销毁也是在该处来判断的

Worker#runWorker方法如下所示:

//该标志用来判断work是否被异常退出
 boolean completedAbruptly = true;
 try {
    //当前work的task不为空,或超时时间内从workqueue成功获取任务,则执行对应task
    while (task != null || (task = getTask()) != null) {
        //暂时省略               
}
    //从workQueue中获取task失败(超时或pool被关闭)
    completedAbruptly = false;        
} finally {
    //销毁worker
    processWorkerExit(w, completedAbruptly);        
}       

1.2 线程池中线程的销毁

Worker#getTask方法

// 是否允许核心线程池过期或当前worker数量是否大于核心线程池
 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

 //timedOut代表上次获取是否超时
//pool可以动态调整maximumPoolSize大小,因此需要判断是否大于最大线程池数量或 当前线程数量大于核心数量,且上次获取Task超时,获取Task设置的超时时间是keepAliveTime,代表worker空闲时间大于等于keepAliveTime ,此时返回null,runWorker中的循环会结束,执行finally中的方法
if ((wc > maximumPoolSize || (timed && timedOut))
    && (wc > 1 || workQueue.isEmpty())) {
    if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;            
    }
    try {
         Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;       
    } catch (InterruptedException retry) {
                timedOut = false;
    }         

Worker#processWorkerExit


//如果completedAbruptly为True,代表异常退出,需要workerCount数量减1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
    decrementWorkerCount();       
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        //销毁线程worker
        workers.remove(w);
        } finally {
            mainLock.unlock();
        }       
    //判断pool是否需要关闭
    tryTerminate();
    int c = ctl.get();
        //如果线程池处于RUNNING或SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            //未被异常退出,可能是获取超时,次数queue中可能还有剩余任务,
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                //若worker数量为0,需要创建worker来完成queue中剩余的任务
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //创建woker,保证消费queue中的任务
            addWorker(null, false);
        }   

暂时粗略的介绍了线程池创建的参数,以及提交任务和线程池中线程的销毁策略,还有比较多的细节知识会在后续慢慢补充。

上一篇:并发编程-Executor具体实现之ThreadPoolExecutor


下一篇:AutoFac(五)——通过lambda表达式灵活注册