JUC学习 - 深入剖析线程池(ThreadPoolExecutor)(补)

接上一篇博客 https://blog.csdn.net/qq_43605444/article/details/121727738?spm=1001.2014.3001.5501

6、Worker 类

下面的是在 Worker 类上的官方的一段注释:

/**
  * Class Worker mainly maintains interrupt control state for
  * threads running tasks, along with other minor bookkeeping.
  * This class opportunistically extends AbstractQueuedSynchronizer
  * to simplify acquiring and releasing a lock surrounding each
  * task execution.  This protects against interrupts that are
  * intended to wake up a worker thread waiting for a task from
  * instead interrupting a task being run.  We implement a simple
  * non-reentrant mutual exclusion lock rather than use
  * ReentrantLock because we do not want worker tasks to be able to
  * reacquire the lock when they invoke pool control methods like
  * setCorePoolSize.  Additionally, to suppress interrupts until
  * the thread actually starts running tasks, we initialize lock
  * state to a negative value, and clear it upon start (in
  * runWorker).
  */

从注释中我们可以了解到 Worker 类是内部类,既实现了Runnable,又继承了AbstractQueuedSynchronizer(以下简称AQS),所以其既是一个可执行的任务,又可以达到锁的效果。从下面的代码也可以看出:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable

Worker 类主要维护正在运行任务的线程的中断控制状态,以及其他次要的记录。

这个类继承了AbstractQueuedSynchronizer类,以简化获取和释放锁(该锁作用于每个任务执行代码)的过程。这样可以防止去中断正在运行中的任务,只会中断在等待从任务队列中获取任务的线程。

我们实现了一个简单的不可重入互斥锁,而不是使用可重入锁(ReentrantLock),因为我们不希望工作任务在调用setCorePoolSize之类的池控制方法时能够重新获取锁。另外,为了在线程真正开始运行任务之前禁止中断,我们将锁状态初始化为负值,并在启动时清除它(在runWorker中)。

我们来看一下它的部分代码:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    final Thread thread;//Worker持有的线程
    Runnable firstTask;//初始化的任务,可以为null
}

Worker 这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。

Worker 执行任务的模型如下:
JUC学习 - 深入剖析线程池(ThreadPoolExecutor)(补)
关于 addWorker 方法的介绍在上一篇博客中有介绍,需要的看【https://blog.csdn.net/qq_43605444/article/details/121727738?spm=1001.2014.3001.5501

我们来看一下 Worker 类的完整的代码:

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    // Worker持有的线程
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    // 初始化的任务,可以为null
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
    Worker(Runnable firstTask) {
        // 设置AQS的同步状态
        // 		- state:锁状态,-1为初始值,0为unlock状态,1为lock状态
        setState(-1); // inhibit interrupts until runWorker  在调用runWorker前,禁止中断
        this.firstTask = firstTask;
        // 线程工厂创建一个线程
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  
      *
      * 将主运行循环委托给外部 runWorker
      */
    public void run() {
        runWorker(this);   // runWorker()是ThreadPoolExecutor的方法
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.  0代表“没被锁定”状态
    // The value 1 represents the locked state.  1代表“锁定”状态

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
	
    /**
      * 尝试获取锁的方法
      * 	- 重写 AQS 的 tryAcquire()
      */
    protected boolean tryAcquire(int unused) {
        // 判断原值为0,且重置为1,所以state为-1时,锁无法获取。
        // 每次都是 0->1 ,保证了锁的不可重入性
        if (compareAndSetState(0, 1)) {
            // 设置exclusiveOwnerThread=当前线程
            // 独占模式
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

   /**
     * 尝试释放锁
     *     - 不是state-1,而是置为0
     */
    protected boolean tryRelease(int unused) {
    	// 清除当前占用的线程
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

   /**
     * 中断(如果运行)
     * shutdownNow时会循环对worker线程执行
     * 且不需要获取worker锁,即使在worker运行时也可以中断
     */
    void interruptIfStarted() {
        Thread t;
        // 如果state>=0、t!=null、且t没有被中断
        // new Worker()时state==-1,说明不能中断
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

关于 AQS 的描述看博主的另外一篇文章 【https://blog.csdn.net/qq_43605444/article/details/121705312?spm=1001.2014.3001.5501

7、runWorker 方法

  • 方法说明:可以说,runWorker(Worker w) 是线程池中真正处理任务的方法,前面的execute() 和 addWorker() 都是在为该方法做准备和铺垫。
  • 参数说明:
    1. Worker w:封装的Worker,携带了工作线程的诸多要素,包括 Runnable(待处理任务)、lock(锁)、completedTasks(记录线程池已完成任务数)
  • 下面是具体的代码分析:
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // new Worker()是state==-1,此处是调用Worker类的tryRelease()方法,将state置为0,而interruptIfStarted()中只有state>=0才允许调用中断
    w.unlock(); // allow interrupts
    
    // 线程退出的原因,true是任务导致,false是线程正常退出
    boolean completedAbruptly = true;
    try {
        // 当前任务和从任务队列中获取的任务都为空,方停止循环
        while (task != null || (task = getTask()) != null) {
            // 上锁可以防止在shutdown()时终止正在运行的worker,而不是应对并发
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
           /**
             * 判断1:确保只有在线程处于stop状态且wt未中断时,wt才会被设置中断标识
             * 条件1:线程池状态>=STOP,即STOP或TERMINATED
             * 条件2:一开始判断线程池状态<STOP,接下来检查发现Thread.interrupted()为true,
             * 即线程已经被中断,再次检查线程池状态是否>=STOP(以消除该瞬间shutdown方法生效,
             * 使线程池处于STOP或TERMINATED),
             * 条件1与条件2任意满足一个,且wt不是中断状态,则中断wt,否则进入下一步
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                // 当前线程调用interrupt()中断
                wt.interrupt();
            try {
                // 执行前(空方法,由子类重写实现)
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 执行后(空方法,由子类重写实现)
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 完成任务数+1
                w.completedTasks++;
                // 释放锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 处理worker的退出
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker 方法的执行过程是:

  1. while循环不断地通过getTask()方法获取任务。
  2. getTask()方法从阻塞队列中取任务。
  3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
  4. 执行任务。
  5. 如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。

8、getTask 方法

  • 方法说明:由函数调用关系图可知,在ThreadPoolExecutor类的实现中,Runnable getTask() 方法是为 void runWorker(Worker w)方法服务的,它的作用就是在任务队列(workQueue)中获取 task(Runnable)。
private Runnable getTask() {
    // 最新一次poll是否超时
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
       /**
         * 条件1:线程池状态SHUTDOWN、STOP、TERMINATED状态
         * 条件2:线程池STOP、TERMINATED状态或workQueue为空
         * 条件1与条件2同时为true,则workerCount-1,并且返回null
         * 注:条件2是考虑到SHUTDOWN状态的线程池不会接受任务,但仍会处理任务
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
       /**
         * 下列两个条件满足任意一个,则给当前正在尝试获取任务的工作线程设置阻塞时间限制
         *(超时会被销毁?不太确定这点),否则线程可以一直保持活跃状态
         * 1.allowCoreThreadTimeOut:当前线程是否以keepAliveTime为超时时限等待任务
         * 2.当前线程数量已经超越了核心线程数
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 两个条件全部为true,则通过CAS使工作线程数-1,即剔除工作线程
        // 条件1:工作线程数大于maximumPoolSize,或(工作线程阻塞时间受限且上次在任务队列拉取任务超时)
        // 条件2:wc > 1或任务队列为空
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 移除工作线程,成功则返回null,不成功则进入下轮循环
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        // 执行到这里,说明已经经过前面重重校验,开始真正获取task了
        try {
            // 如果工作线程阻塞时间受限,则使用poll(),否则使用take()
            // poll()设定阻塞时间,而take()无时间限制,直到拿到结果为止
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            // r不为空,则返回该Runnable
            if (r != null)
                return r;
            // 没能获取到Runable,则将最近获取任务是否超时设置为true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 响应中断,进入下一次循环前将最近获取任务超时状态置为false
            timedOut = false;
        }
    }
}

9、processWorkerExit 方法

  • 方法说明:processWorkerExit(Worker w, boolean completedAbruptly),执行线程退出的方法
  • 参数说明:
    1. Worker w:要结束的工作线程。
    2. boolean completedAbruptly: 是否突然完成(异常导致),如果工作线程因为用户异常死亡,则completedAbruptly参数为 true。
  • 下面让我们看看 processWorkerExit 的源码:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
   /**
     * 1.工作线程-1操作
     * 1)如果completedAbruptly 为true,说明工作线程发生异常,那么将正在工作的线程数量-1
     * 2)如果completedAbruptly 为false,说明工作线程无任务可以执行,由getTask()执行worker-1操作
     */
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    // 2.从线程set集合中移除工作线程,该过程需要加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 将该worker已完成的任务数追加到线程池已完成的任务数
        completedTaskCount += w.completedTasks;
        // HashSet<Worker>中移除该worker
        workers.remove(w);
    } finally {
        // 释放锁
        mainLock.unlock();
    }

    // 3.根据线程池状态进行判断是否结束线程池
    tryTerminate();

   /**
     * 4.是否需要增加工作线程
     * 线程池状态是running 或 shutdown
     * 如果当前线程是突然终止的,addWorker()
     * 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
     * 故如果调用线程池shutdown(),直到workQueue为空前,
     * 线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程
     */
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

10、线程初始化

默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。

在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:

  • prestartCoreThread():boolean prestartCoreThread(),初始化一个核心线程
  • prestartAllCoreThreads():int prestartAllCoreThreads(),初始化所有核心线程,并返回初始化的线程数
public boolean prestartCoreThread() {
    return workerCountOf(ctl.get()) < corePoolSize &&
        addWorker(null, true);   // 注意传进去的参数是null
}

public int prestartAllCoreThreads() {
    int n = 0;
    while (addWorker(null, true))   // 注意传进去的参数是null
        ++n;
    return n;
}

参考文章:

1、https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html 【美团技术团队博客】

大家可以去看一下美团技术团队写的关于线程池的文章,里面有线程池在业务中的实践。

上一篇:can only join a child process


下一篇:Nginx基础学习