JUC之AbstractQueuedSynchronizer-ConditionObject

更多关于AbstractQueuedSynchronizer介绍,请戳《JUC之AbstractQueuedSynchronizer基本介绍》、《JUC之AbstractQueuedSynchronizer共享模式》。


AbstractQueuedSynchronizer中提供一个内部类ConditionObject用于模拟线程等待(wait)和唤醒(notify)的功能。ConditionObject中提供一个等待队列用于存放等待状态的线程,提供await方法用于将当前线程加入到等待队列中,提供signal方法用于唤醒线程将其从等待队列中移除。和Synchronized类比,await相当于Object中wait,signal相当于Object中notify,所以当前线程在使用await和signal方法的时候,也必须要持有AbstractQueuedSynchronizer提供的锁(通过acquire方法成功获取到锁)。

JUC之AbstractQueuedSynchronizer-ConditionObject

 


█ 等待队列

ConditionObject中也提供了一个等待队列,AbstractQueuedSynchronizer中的等待队列也可以叫同步队列,队列中的线程在等待获取锁。在ConditionObject的等待队列中的线程是先前已经获取到锁了,自己主动放弃锁进入等待队列,等待某个持有锁的线程将其唤醒或者自己主动唤醒(超时等待)。

private transient Node firstWaiter;

private transient Node lastWaiter;

通过firstWaiter指向等待队列中的第一个等待节点,lastWaiter指定等待队列中的最后一个节点。节点与节点之间通过前一个节点的nextWaiter指向下一个节点。所以不同于AbstractQueuedSynchronizer中的等待队列(同步队列)是双向的,ConditionObject中的等待队列是单向链表。这里的firstWaiter、lastWaiter与AbstractQueuedSynchronizer中的head、tail意思是一样的。


█ await

持有锁的线程调用await方法将会主动放弃锁,将其加入等待队列中。

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 将当前线程封装成node对象添加到等待队列中    
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

①addConditionWaiter

将当前线程封装成node对象添加到等待队列中。在将当前线程节点加入到队列之前,其对等待队列中的已有元素做了有效性判断,去除waitStatus不为CONDITION=-2的线程节点。

private Node addConditionWaiter() {
    Node t = lastWaiter;
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

下面代码的逻辑即检查等待队列中节点的有效性,去掉等待队列中waitStatus不为CONDITION=-2的节点。如果线程被其他线程调用了interrupt方法的话,可以将其移除。当等待队列中没有线程节点的时候,lastWaiter是为null的。

Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
    unlinkCancelledWaiters();
    t = lastWaiter;
}

unlinkCancelledWaiters:

执行具体的检查逻辑。因为等待队列是单向链表,其逻辑是从等待队列的头节点开始遍历,如果头节点的waitStatus != Node.CONDITION则将firstWaiter的引用指向头结点的下一个节点,则该下一个节点就变成了新的头节点,如此往复进行。

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    // trail用于记录下一个有效的节点元素
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            // 断开与下一个节点的连接,则t就变成了独立的对象,没有其他引用指向
            // 可以等待GC回收了。
            t.nextWaiter = null;
            // 将当前头结点的下一个节点设置成头结点
            if (trail == null)
                firstWaiter = next;
            else
            // 通过nextWaiter连接到队列中
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else // 下一个有效的节点
            trail = t;
        // 接着遍历下一个节点    
        t = next;
    }
}

②fullyRelease

加入等待队列后,要主动放弃其持有的锁。

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

release:

释放锁,并唤醒AbstractQueuedSynchronizer的同步队列中的第一个线程节点,让其继续竞争获取锁资源。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease由子类实现具体的放弃锁的过程。unparkSuccessor唤醒同步队列中的线程。

③如果当前线程不在同步队列中等待的话,让当前线程休眠。LockSupport.park(this);会一直阻塞当前线程,不会继续执行下去了,直到调用LockSupport.unpark唤醒它。

while (!isOnSyncQueue(node)) {
    LockSupport.park(this);// 调用完这里的代码会一直阻塞在这里,直到持有锁的线程调用signal方法将其唤醒接着继续执行。
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}

④将当前线程加入到AbstractQueuedSynchronizer的同步队列中。因为在上面的步骤中该线程已经放弃了锁,所以当它从阻塞状态唤醒之后,需要继续去执行获取锁资源的逻辑。

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;

⑤继续检查等待队列中节点的等待状态,去除状态不为CONDITION=-2的节点。

if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();

当调用await后,代码逻辑会执行到LockSupport.park(this);,此时当前线程会一直阻塞,直到被其他线程唤醒再继续执行后面的代码逻辑。被其他线程唤醒的逻辑在signal中。


█ signal

持有锁的线程通过调用signal来唤醒在等待队列中的线程。signal每次只会唤醒一个线程。

public final void signal() {
    // 当前线程必须是持有锁的线程,否则会抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

①doSignal

唤醒等待队列中的第一个线程节点,如果成功则方法退出返回,如果没有将继续尝试下一个线程节点。

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 断开头节点与队列的连接    
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

transferForSignal(first)返回false的时候表示节点没有唤醒成功,!transferForSignal(first)才为true,则进行(first = firstWaiter) != null逻辑,firstWaiter指向下一个线程节点,然后执行唤醒这个节点的逻辑。

②transferForSignal

唤醒node节点中持有的线程。

final boolean transferForSignal(Node node) {

    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

当执行完signal方法之后,被唤醒的线程会接着从await方法的阻塞处开始向后继续执行后续的逻辑。即LockSupport.park(this);处向后执行。


█ 其他方法

(1)

await方法提供的是一直阻塞等待直到被唤醒。ConditionObject还提供了超时等待唤醒。

①awaitNanos

等待nanosTimeout个纳秒后自动唤醒

public final long awaitNanos(long nanosTimeout)

②awaitUntil

指定等待日期,直到达到该等待日期时间则主动唤醒。

public final boolean awaitUntil(Date deadline)

③await(long time, TimeUnit unit)

指定时长和时间单位等待。

public final boolean await(long time, TimeUnit unit)

④awaitUninterruptibly

await在执行时,如果调用线程中断了,会抛出异常,而awaitUninterruptibly方法不会抛出异常。

public final void awaitUninterruptibly()

(2)

signal每次只能唤醒等待队列中的一个线程,ConditionObject还提供了一个方法用于将等待队列中的所有等待线程都唤醒,即signalAll

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

 doSignalAll:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

(3)

①hasWaiters,判断是否有等待线程。(线程节点的waitStatus需要为Node.CONDITION才参与)

protected final boolean hasWaiters()

②getWaitQueueLength,获取等待队列中等待线程的个数。(线程节点的waitStatus需要为Node.CONDITION才参与统计)

protected final int getWaitQueueLength()

③getWaitingThreads,获取等待队列中的线程。线程节点的waitStatus需要为Node.CONDITION才参与)

protected final Collection<Thread> getWaitingThreads()

经过《JUC之AbstractQueuedSynchronizer基本介绍》《JUC之AbstractQueuedSynchronizer共享模式》以及本文,关于AbstractQueuedSynchronizer的内容就介绍到这里。

补充:

(1)在AbstractQueuedSynchronizer中定义的字段被volatile关键字修饰,为了让该字段在多线程下被一个线程修改其他的线程能够及时可见修改后的值(可见性)

// AbstractQueuedSynchronizer
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
// Node
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;

(2)在AbstractQueuedSynchronizer中对字段的修改使用了CAS操作来保证线程安全。

// 修改state
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// 修改head
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
// 修改tail
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
// 修改Node.waitStatus
private static final boolean compareAndSetWaitStatus(Node node,
                                                     int expect,
                                                     int update) {
    return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                    expect, update);
}
// 修改Node.next
private static final boolean compareAndSetNext(Node node,
                                               Node expect,
                                               Node update) {
    return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}

 

上一篇:多线程进阶=>JUC编程


下一篇:JUC