Condition的await()和signal()流程

介绍

Conditionj.u.c包下提供的一个接口。
可以翻译成 条件对象,其作用是线程先等待,当外部满足某一条件时,在通过条件对象唤醒等待的线程。ArrayBlockingQueue就是通过Condition实现的。

先看一下Condition接口提供了哪些方法:

/**
 *  条件对象
 */
public interface Condition {

    /**
     * 让线程进入等待,如果其他线程调用同一Condition对象的notify/notifyAll,那么等待的线程可能被唤醒
     */
    void await() throws InterruptedException;

    /**
     * 不抛出中断异常的await方法
     */
    void awaitUninterruptibly();

    /**
     * 带超时的await
     */
    long awaitNanos(long nanosTimeout) throws InterruptedException;

    /**
     * 带超时的await(可指定时间单位)
     */
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    /**
     * 带超时的await(指定截止时间)
     */
    boolean awaitUntil(Date deadline) throws InterruptedException;

    /**
     * 唤醒等待的线程
     */
    void signal();

    /**
     * 唤醒所有线程
     */
    void signalAll();
}

Condition接口主要提供了两类方法——让线程等待的方法(await()等)和唤醒线程的方法(signal())。

AQS内部提供了Condition接口的实现——ConditionalObject。它内部的字段如下:

    private static final long serialVersionUID = 1173984872572414699L;
    //该ConditionObject维护的等待队列的头节点
    private transient Node firstWaiter;
    //该ConditionObject维护的等待队列的尾节点
    private transient Node lastWaiter;

非常简单,从上面的字段我们大概可以猜到Condition内部也维护了一个队列。
上篇文章中,我们已经分析锁实现的远离就是通过节点构成队列:让队列中除头节点外的其他线程都被Park,当头节点释放锁时,头节点唤醒下一个节点(Unpark线程),同时更新头节点。
举一反三,我们推测Condition唤醒功能的原理也是通过维护队列的节点。
接下来就通过分析源码,(主要是await()signal()方法),验证我们的猜测。


Condtion对象的获取

Condition对象的获取主要是通过Lock.newCondition()方法。
一个Lock对象可以返回多个Condition对象。
在对Condition进行等待或者唤醒前,都需要先持有Condition关联Lock对象,否则会抛出IllegalMonitorStateException异常。


Condition.await()过程

public final void await() throws InterruptedException {
            //如果线程已经被标记为中断,则抛出异常
            if (Thread.interrupted())
                throw new InterruptedException();
            //将线程添加进等待队列
            //注意等待队列和AQS维护的阻塞队列是两个不同的队列
            //正常流程当线程能调用await(),说明线程此时拥有锁,此时AQS的阻塞队列中,线程应该在head节点
            Node node = addConditionWaiter();

            //释放掉锁(如果释放失败,NODE的waitStatus被更新为CANCELLED)
            //同时因为释放掉了锁,该线程在阻塞队列中的节点也已经被移除
            int savedState = fullyRelease(node);

            //这里会将线程挂起,除非线程节点被移到AQS的阻塞队列或是线程被外部中断
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                //检查是否是由于被中断而唤醒,如果是,则跳出循环
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //在阻塞队列中尝试获取锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            //节点已经在阻塞队列中,与Condition的等待队列联系断开
            //对于SIGNAL唤醒的线程而言,SIGNAL时除了将节点移到阻塞队列,同时也清空了node.nextWaiter
            //而对于中断唤醒的线程而言,只是将节点移到阻塞队列,并没有清空node.nextWaiter(因为此时线程不持有,操作等待线程并非线程安全)
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            //根据interruptMode 决定是否需要抛出异常
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

await()方法主要可以分为以下几个过程:
1)线程将自己包装成节点,并添加到Condition的阻塞队列中
2)线程主动释放掉锁
3)线程进入自循环等待(主动通过LockSupport.park()),醒来时,检查自己是否已经被移动至Lock的阻塞队列
4)线程在阻塞队列中等待,直到获取锁(线程在等待时可能又会被LockSupport.park()挂起)
5)线程获取锁,检查自己在等待过程中(await()过程)是否有被中断
6)如果有需要,则清理节点与等待队列之间的联系
7)根据中断状态确定是否需要抛出异常,以便让await()的调用者可以响应线程的中断状态

从上面的流程,我们可以清楚的了解到以上步骤1和2,以及步骤5,6,7是持有锁的,步骤3和4并没有持有锁。了解这一点很重要,因为涉及某些方法是否需要以CAS来保证线程安全。
了解了大体流程,接下来就逐步分析各个步骤。

步骤1.线程包装成节点,添加进Condition的等待队列

这一步骤主要是addConditionWaiter()过程

    private Node addConditionWaiter() {
            Node t = lastWaiter;

            // 找出该ConditionObject的等待队列中 真正未被取消的最后一个节点,并更新为lastWaiter
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }

            //将该线程包装成Node
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            //如果此时ConditionObject队列为空,初始化链表且头节点为node
            if (t == null)
                firstWaiter = node;
            else //否则将node添加队尾
                t.nextWaiter = node;
            //更新链表尾节点为node
            lastWaiter = node;
            return node;
        }

主要找出等待队列的最后一个节点,将线程包装成Node,添加到队列的队尾。
这里要注意的一点是此时Node的waitStatusCONDITION。节点的waitStatus对判断等待是否被取消很重要,在等待队列中等待的节点状态应该为CONDITION,如果状态不为CONDITION,说明线程已经取消了等待(如果waitStatus为0说明被唤醒或中断)。

步骤2.线程释放锁

释放锁的步骤比较简单。主要通过fullRelease()更新AQSstate为0并且将AQS的拥有者置为null,同时唤醒阻塞队列中的后继节点。

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            //获取锁被持有的此时
            int savedState = getState();
            //让锁直接释放被持有的次数
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            //如果释放失败了,则将节点标记为CANCELLED
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

步骤3.线程挂起,进入循环等待

这一步比较关键,线程等待的动作都发生在这一步。

    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            //检查是否是由于被中断而唤醒,如果是,则跳出循环
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
    }

线程执行到这,会被LockSupport.park()挂起。
如果此时线程被唤醒,线程会检查是否是因为中断,如果发生中断,还需要确定中断是否发生在SIGNAL前(如果发生在SIGNAL前,之后线程需要抛出异常,让外部响应)。
如果线程不是因为中断而唤醒,线程需要确认节点是否已经被移动至AQS的等待队列。如果没有被移动,则继续被挂起(防止假唤醒)。

checkInterruptWhileWaiting()就是用来检测线程在等待的时候是否被中断。

/**
         * 检查线程在WAITING状态期间,是否有被中断
         * 如果没有返回0;如果是在SIGNAL之前被中断,返回 THROW_IE;如果在SIGNAL之后被中断,则返回REINTERRUPT
         */
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }

如果线程被中断,还需要通过transferAferCancelledWait判断中断是否发生在SIGNAL之前。

final boolean transferAfterCancelledWait(Node node) {
        //CAS操作,期待值是CONDITION,说明此时唤醒是被取消(中断),因为如果是SIGNAL,那么waitingStatus不会CONDITION,而是0(可以见SIGNAL流程)
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            //即使是取消的,也需要移到AQS的阻塞队列
            enq(node);
            return true;
        }

        //说明线程先收到了SIGNAL信号
        //此时要等SIGNAL信号处理完成
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

前文说了Node节点的waitStatus是一个很重要的状态,从它可以推断出线程节点发生了什么操作。
上述方法先用CAS尝试将更新节点的waitStatus为0,期待值为CONDITION
如果尝试成功,说明此时节点未被操作过(SIGNAL信号),线程是中断唤醒的,此时需要通过enq()将节点添加AQS阻塞队列,因为此时没有锁,所以enq()方法以CAS重试的方式保证线程安全。
如果尝试失败,说明线程收到了SIGNAL信号,节点将由负责SIGNAL的线程移动至阻塞队列。这里为了避免线程返回过早,在判断出线程还未移动至阻塞队列的情况下,会通过Thread.yeild()让出CPU时间。

步骤4.线程在阻塞队列中重新等待锁

这一步主要是通过acquireQueue()方法。该方法已经在上一篇文章中介绍过了,这里不过多介绍。
需要注意的一点是,即使线程在等待时被中断,仍然需要在AQS的阻塞队列中等待获取锁。因为外部没有办法在线程获取锁之前发现中断状态,而且即使线程抛出了中断异常,此时线程也是持有锁的,外部需要显式的释放。

步骤5.检查并设置中断状态

这一步很简单,主要就是通过步骤3中的checkInterruptWhilewaiting()方法返回值:0表示未中断,-1表示中断发生在SIGNAl之前,1表示中断发生在SIGNAL之后。

步骤6.清理节点与等待队列的联系

这里有两种情况,如果线程是因为SIGNAL唤醒的,在唤醒时调用signal()的线程已经清理了被唤醒节点与等待队列的关系。 因为那时唤醒线程持有锁,操作是安全的。
但是如果对于中断被唤醒的线程,唤醒时是不持有锁的,不能保证线程安全的清理唤醒节点与等待队列的关系。因此就将等待清理工作放在了获取锁之后。

//此方法并不保证线程安全,因此调用此方法时,必须要在获取锁的情况下调用
        //此方法的目的是为了整理Condition的等待队列,将非CONDITION状态的节点从等待队列中移除
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

步骤7.确定中断状态,并决定是否抛出异常

这一步也很简单,就是根据interruptMode来确定是否抛出异常,如果interruptMode值为THROW_IE,说明线程被唤醒前先被中断,此时需要抛出InterruptedException


Condition.signal()过程

signal()过程比起await()要简单很多。既然await()过程是将节点添加到等待队列,那么signal()作为await()的逆过程,就是将节点从等待队列重新移动到AQS阻塞队列。

        /**
         * 唤醒等待节点
         * 主要的流程就是将节点从Condition的等待队列移到AQS的阻塞队列中,让其重新等待锁的获取
         */
        public final void signal() {
            //先验证唤醒者是否是锁的持有者
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //唤醒等待队列的第一个节点
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

为保证安全,先确定唤醒线程是否为锁的持有者。
之后找出等待队列中的头节点,将其唤醒。

        /**
         * 唤醒操作(此时占有锁,为线程安全),
         * 找出等待队列中第一个真正要被唤醒的节点,移动到阻塞队列,
         */
        private void doSignal(Node first) {
            do {
                //找出第一个需要唤醒的节点
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                //断开和之后等待队列的联系,并更新等待队列的头节点
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null); //等待队列的头节点移动失败 且 等待队列中还有其他节点,则继续尝试其他节点
        }

因为头节点可能已经被取消,所以这里会一直在等待队列中从前往后找一个节点,开始唤醒。直到一个节点唤醒成功或者等待队列中没有节点需要唤醒。
上文也说了唤醒过程其实就是节点的移动过程。

    /**
     * 将节点从Condition的等待队列移动到AQS的阻塞队列(该方法只会在signal相关的方法中被调用)
     */
    final boolean transferForSignal(Node node) {
        //CAS操作更新节点的waitStatus,期待值为CONDITION
        //操作成功,说明没有其他线程在操作这个节点
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) //CAS操作失败,说明节点已经被中断操作过,waitStatus已经变成了0
            return false;

        //这里说明CAS操作成功
        //应该是线程安全的
        //将节点添加到阻塞队列的队尾
        Node p = enq(node);
        //判断此时阻塞队列是否有前继节点等待,有就Park线程,等待前继节点唤醒
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

transferForSignal()操作其实就是先通过CAS操作确定等待队列中的节点还未被取消。如果CAS操作成功,则将其添加到阻塞队列,如果此时阻塞队列还需要等待锁,则让await()的线程继续被挂起。将给线程唤醒的任务(此时已经是获取锁的任务,不再是之前的await()任务)交给阻塞队列中的前继节点。

await()以及signal()过程的流程图

跟着流程图在走一遍,可以帮助巩固上述的知识点。
Condition的await()和signal()流程

总结

再借上篇文章重口味的比方梳理下这个流程。
当你排队进了WC的包厢,想要方便时,你觉得太脏了,于是你在包厢内留了张纸条,希望有人能在厕所包厢干净了在叫你过来上厕所(调用Condition的await ()),随后主动让出了包厢的使用权(释放锁)。后面在排队等着方便的人便进去了。
你离开后,到另一个地方边玩手机边等(移动到Condition的阻塞队列,并被Park),期间,也有一些人同样觉得厕所太脏,跑了出来,在外面等,并排在了你的后面。
过了一段时间,有人把打电话给你说厕所已经变干净了,可以去用了,你就重新回到厕所那排起了队伍,等待轮到你用厕所(acquireQueue)。因为那个人仅通知了你,没有通知其他因为嫌弃厕所脏,而跑出来的人,所以那些只能继续在那等别人去叫他们。
如果你在等待的时候突然有人说服了你,其实脏一点也无所谓(外部未给signal却被中断),你就主动从在外面等待走到了厕所前去排队等待继续去用厕所。

上一篇:如何快速开发树形列表和分页查询整合的Winform程序界面?


下一篇:Condition接口及其主要实现类ConditionObject源码浅析