java.util.concurrent解析——ThreadPoolExecutor源码解析

任何一种语言、框架,线程都是非常重要的一部分。要想实现异步就需要通过异步线程,但是频繁地创建销毁线程会带来较大的性能开销,而线程池就是为解决这一问题而出现的。简单来说线程池有以下几大优势:

  • 降低资源开销:通过复用已经创建的线程,降低线程频繁创建、销毁带来的资源开销和性能损耗
  • 快速启动任务:通过复用已有线程,快速启动任务
  • 易于管理:线程池可以统一管理、分配、调优和监控

Java中的线程池是基于ThreadPoolExecutor实现的,我们使用的ExecutorService的各种线程池策略都是基于ThreadPoolExecutor实现的,所以ThreadPoolExecutor十分重要。要弄明白各种线程池策略,必须先弄明白ThreadPoolExecutor

1 创建线程池

首先来看下线程池的创建:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • corePoolSize 核心线程池大小
  • maximumPoolSize 线程池最大容量大小
  • keepAliveTime 线程池空闲时,线程存活的时间
  • TimeUnit 时间单位
  • ThreadFactory 线程工厂
  • BlockingQueue任务队列
  • RejectedExecutionHandler 线程拒绝策略

java.util.concurrent解析——ThreadPoolExecutor源码解析

ThreadPoolExecutor的基本流程如下:

  • 当用户通过submit或者execute提交任务时,如果当前线程池中线程数小于corePoolSize,直接创建一个线程执行任务
  • 如果当前线程数大于corePoolSize,则将任务加入到BlockingQueue
  • 如果BlockingQueue也满了,在小于MaxPoolSize的情况下创建线程执行任务
  • 如果线程数大于等于MaxPoolSize,那么执行拒绝策略RejectedExecutionHandler
  • 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程

2 线程池状态

ThreadPoolExecutor内部有多个状态,理解线程池内部状态对于理解线程池原理至关重要,所以接下来看下线程池的状态:

/*
     * runState是整个线程池的运行生命周期状态,有如下取值:
     *  1. RUNNING:可以新加线程,同时可以处理queue中的线程。
     *  2. SHUTDOWN:不增加新线程,但是处理queue中的线程。
     *  3. STOP 不增加新线程,同时不处理queue中的线程。
     *  4. TIDYING 所有的线程都终止了(queue中),同时workerCount为0,那么此时进入TIDYING
     *  5. terminated()方法结束,变为TERMINATED
     * The runState provides the main lifecyle control, taking on values:
     *
     *   RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *   STOP:     Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed
     *
     * The numerical order among these values matters, to allow
     * ordered comparisons. The runState monotonically increases over
     * time, but need not hit each state. The transitions are:
     * 状态的转化主要是:
     * RUNNING -> SHUTDOWN(调用shutdown())
     *    On invocation of shutdown(), perhaps implicitly in finalize()
     * (RUNNING or SHUTDOWN) -> STOP(调用shutdownNow())
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TIDYING(queue和pool均empty)
     *    When both queue and pool are empty
     * STOP -> TIDYING(pool empty,此时queue已经为empty)
     *    When pool is empty
     * TIDYING -> TERMINATED(调用terminated())
     *    When the terminated() hook method has completed
     *
     * Threads waiting in awaitTermination() will return when the
     * state reaches TERMINATED.
     *
     * Detecting the transition from SHUTDOWN to TIDYING is less
     * straightforward than you'd like because the queue may become
     * empty after non-empty and vice versa during SHUTDOWN state, but
     * we can only terminate if, after seeing that it is empty, we see
     * that workerCount is 0 (which sometimes entails a recheck -- see
     * below).
     */

runState的存储也值得一说,它并不是用一个单独的int或者enum进行存储,而是和线程数workerCount共同保存到一个原子量ctl中:

//利用ctl来保证当前线程池的状态和当前的线程的数量。ps:低29位为线程池容量,高3位为线程状态。
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //设定偏移量
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //确定最大的容量2^29-1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    //几个状态,用Integer的高三位表示
    // runState is stored in the high-order bits
    //111
    private static final int RUNNING    = -1 << COUNT_BITS;
    //000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //001
    private static final int STOP       =  1 << COUNT_BITS;
    //010
    private static final int TIDYING    =  2 << COUNT_BITS;
    //011
    private static final int TERMINATED =  3 << COUNT_BITS;
    //获取线程池状态,取前三位
    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //获取当前正在工作的worker,主要是取后面29位
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //获取ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }
  • 通过调用runStateOf()方法获取当前线程池状态
  • 通过调用workerCountOf()获取当前线程数

3 添加任务

向线程池添加任务一般通过execute或者submit方法添加,接下来通过execute方法介绍下添加任务的原理:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //当前的Worker的数量小于核心线程池大小时,新建一个Worker线程执行该任务。
        if (workerCountOf(c) < corePoolSize) { 
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

          //如果worker数量已经大于核心线程数,尝试将任务添加到任务队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))//recheck防止线程池状态的突变,如果突变,那么将reject线程,防止workQueue中增加新线程
                reject(command);
            else if (workerCountOf(recheck) == 0)//上下两个操作都有addWorker的操作,但是如果在workQueue.offer的时候Worker变为0,
                 //那么将没有Worker执行新的task,所以增加一个Worker.
                addWorker(null, false);
        }
        //如果workQueue满了,那么这时候可能还没到线程池的maximum,所以尝试增加一个Worker
        else if (!addWorker(command, false))
            reject(command);//如果Worker数量到达上限,那么就拒绝此线程
    }

可以看到execute方法内部的核心逻辑在于添加工作线程addWorker方法,所以接下来看下addWorker:

   private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            /**
             * rs!=Shutdown || fistTask!=null || workCount.isEmpty
             * 如果当前的线程池的状态>SHUTDOWN 那么拒绝Worker的add 
             * 如果=SHUTDOWN,那么此时不能新加入不为null的Task,如果在WorkCount为empty的时候不能加入任何               * 类型的Worker,
             * 如果不为empty可以加入task为null的Worker,增加消费的Worker
             */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //如果当前线程数已经超标,直接返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //如果线程数没有超标,则尝试通过CAS将workercount加一,如果成功直接跳出循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //如果失败,对状态进行double check,如果状态已改变则重试
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        //接下来开始真正创建新的线程
        //创建一个新的worker线程
        Worker w = new Worker(firstTask);
        Thread t = w.thread;
        //获取锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            int c = ctl.get();
            int rs = runStateOf(c);
            /**
             * rs!=SHUTDOWN ||firstTask!=null
             * 
             * 同样检测当rs>SHUTDOWN时直接拒绝减小Wc,同时Terminate,如果为SHUTDOWN同时firstTask不为null的时候也要Terminate
             */
            if (t == null ||
                (rs >= SHUTDOWN &&
                 ! (rs == SHUTDOWN &&
                    firstTask == null))) {
                decrementWorkerCount();
                tryTerminate();
                return false;
            }
            //将新建的worker线程加入到workers数组中
            workers.add(w);

            int s = workers.size();
            if (s > largestPoolSize)
                largestPoolSize = s;
        } finally {
            mainLock.unlock();
        }
        //新建线程开始执行
        t.start();
        // It is possible (but unlikely) for a thread to have been
        // added to workers, but not yet started, during transition to
        // STOP, which could result in a rare missed interrupt,
        // because Thread.interrupt is not guaranteed to have any effect
        // on a non-yet-started Thread (see Thread#interrupt).
        //若此时线程池状态变为STOP,但当前线程并未interrupt,执行interrupt
        if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
            t.interrupt();
        return true;
    }

整个addWorker方法大致分为两大阶段:

  • workerCount++:此时并不创建真正的线程,而仅仅是通过CAS操作把workerCount加一
  • 创建线程:创建worker线程,将其加入到workers队列中,并根据状态对线程进行不同操作

3.1 workerCount++

workerCount++操作主要涉及上述代码中标号retry覆盖的代码,主要逻辑有以下三大部分:

  • 根据线程池当前状态判断是否可以添加线程,如果不能添加直接返回false:

    • 如果当前的线程池的状态>SHUTDOWN 那么拒绝Worker的add
    • 如果=SHUTDOWN,那么此时不能新加入不为null的Task,如果在WorkCount为empty的时候不能加入任何类型的Worker
    • 如果不为empty可以加入task为null的Worker,增加消费的Worker
  • 根据当前worker数判断是否可以添加线程:

    • 如果core为true,且当前worker数超过corePoolSize则不允许添加线程
    • 如果core为fasle,且worker数超过maximumPoolSize则不允许添加线程
  • 通过compareAndIncrementWorkerCount执行workerCount++操作,如果成功跳出循环;如果失败对当前状态进行doubleCheck,如果状态改变重新回到步骤1,如果状态不变重新回到步骤2

3.2 创建线程

创建线程的操作主要分为以下几个步骤:

  • 创建一个worker线程实例
  • 获取当前线程池锁进行互斥操作
  • 对线程池状态再次进行判断。同样检测当rs>SHUTDOWN时直接拒绝减小Wc,同时Terminate,如果为SHUTDOWN同时firstTask不为null的时候也要Terminate
  • 将线程加入线程队列中,释放锁
  • 执行线程
  • 若此时线程池状态变为STOP,但当前线程并未interrupt,执行interrupt

4 Worker

在第3节中看到添加的线程是通过Worker实现的,所以接下来看下Worker这个类:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        private static final long serialVersionUID = 6138294804551838833L;
        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

        Worker(Runnable var2) {
            this.setState(-1);
            this.firstTask = var2;
            this.thread = ThreadPoolExecutor.this.getThreadFactory().newThread(this);
        }
        ......
        public void run() {
            ThreadPoolExecutor.this.runWorker(this);
        }
}

可以看到Worker实现了Runnable接口,并在内部维护了一个线程变量,看到这里其实Worker的大致逻辑明显了,无非是维护一个线程实例,执行添加的runnable实例。

4.1runWorker

在addWorker方法中,Worker实例创建好后会就会执行其thread变量的start方法,进而也就会执行Worker的run方法:

public void run() {
  ThreadPoolExecutor.this.runWorker(this);
}

所以接下来看下ThreadPoolExecutor的runWorker方法:

final void runWorker(Worker w) {
     Runnable task = w.firstTask;
     w.firstTask = null;
     //标识线程是不是异常终止的
     boolean completedAbruptly = true;
     try {
         //task不为null情况是初始化worker时,如果task为null,则去队列中取线程--->getTask()
         while (task != null || (task = getTask()) != null) {
             w.lock();
             //获取woker的锁,防止线程被其他线程中断
             clearInterruptsForTaskRun();//清楚所有中断标记
             try {
                 beforeExecute(w.thread, task);//线程开始执行之前执行此方法,可以实现Worker未执行退出,本类中未实现
                 Throwable thrown = null;
                 try {
                     task.run();
                 } catch (RuntimeException x) {
                     thrown = x; throw x;
                 } catch (Error x) {
                     thrown = x; throw x;
                 } catch (Throwable x) {
                     thrown = x; throw new Error(x);
                 } finally {
                     afterExecute(task, thrown);//线程执行后执行,可以实现标识Worker异常中断的功能,本类中未实现
                 }
             } finally {
                 task = null;//运行过的task标null,方便GC
                 w.completedTasks++;
                 w.unlock();
             }
         }
         completedAbruptly = false;
     } finally {
         //处理worker退出的逻辑
         processWorkerExit(w, completedAbruptly);
     }
 }

整个方法的逻辑比较简单:

  • task不为null情况是初始化worker时,如果task为null,则去队列中取线程--->getTask()
  • 获取woker的锁,防止线程被其他线程中断
  • 线程开始执行之前执行beforeExecute方法,可以实现Worker未执行退出,本类中未实现
  • 执行任务
  • 线程执行后执行,可以实现标识Worker异常中断的功能,本类中未实现
  • 处理worker退出的逻辑

4.2 getTask

接下来再来看看runWorker中的getTask方法:

private Runnable getTask() {
     boolean timedOut = false; // Did the last poll() time out?

     retry:
     for (;;) {
         int c = ctl.get();
         int rs = runStateOf(c);

         // Check if queue empty only if necessary.
         //当前状态为>stop时,不处理workQueue中的任务,同时减小worker的数量所以返回null,如果为shutdown 同时workQueue已经empty了,同样减小worker数量并返回null
         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
             decrementWorkerCount();
             return null;
         }

         boolean timed;      // Are workers subject to culling?

         for (;;) {
             int wc = workerCountOf(c);
             timed = allowCoreThreadTimeOut || wc > corePoolSize;

             if (wc <= maximumPoolSize && ! (timedOut && timed))
                 break;
             if (compareAndDecrementWorkerCount(c))
                 return null;
             c = ctl.get();  // Re-read ctl
             if (runStateOf(c) != rs)
                 continue retry;
             // else CAS failed due to workerCount change; retry inner loop
         }
       
         try {
             Runnable r = timed ?
                 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                 workQueue.take();
             if (r != null)
                 return r;
             timedOut = true;
         } catch (InterruptedException retry) {
             timedOut = false;
         }
     }
 }

这段代码十分关键,首先看几个局部变量:

boolean timedOut = false;//主要是判断后面的poll是否要超时
boolean timed;//主要是标识着当前Worker超时是否要退出

wc > corePoolSize时需要减小空闲的Worker数,那么timed为true,但是wc <= corePoolSize时,不能减小核心线程数timed为false。
timedOut初始为false,如果timed为true那么使用poll取线程。如果正常返回,那么返回取到的task。如果超时,证明worker空闲,同时worker超过了corePoolSize,需要删除。返回r=null。则 timedOut = true。此时循环到wc <= maximumPoolSize && ! (timedOut && timed)时,减小worker数,并返回null,导致worker退出。如果线程数<= corePoolSize,那么此时调用 workQueue.take(),没有线程获取到时将一直阻塞,直到获取到线程或者中断。

上一篇:eclipse: workspace出错导致无法启用的解决


下一篇:《写给程序员的数据挖掘实践指南》——5.1训练集和测试集