通过ReentrantLock源代码分析AbstractQueuedSynchronizer独占模式

1. 重入锁的概念与作用

      reentrant 锁意味着什么呢?简单来说,它有一个与获取锁相关的计数器,如果已占有锁的某个线程再次获取锁,那么lock方法中将计数器就加1后就会立刻返回。当释放锁时计数器减1,若计数器不为0,说明线程仍然占有锁;若计数器值为0,线程才会真正释放锁。

可重入锁可以避免同一个线程嵌套(或者说递归)获取锁时的死锁现象。

考虑下面这样一种情况

public class LockAnalysis {
private Lock l = new ReentrantLock();
public void funA(){
l.lock();
System.out.println("funA do something");
l.unlock();
}
public void funB(){
l.lock();
System.out.println("funB do something");
funA();
l.unlock();
}
}

如果不是可重入锁,那么线程调用这个类的对象的funB方法时就会导致死锁现象。

可重入锁的好处是,线程当前的操作需要加锁时,直接加锁即可,不需要考虑已加锁的代码块中是否又进行了加锁的操作。

如果不是可重入锁,那么线程调用这个类的对象的funB方法时就会导致死锁现象。

可重入锁的好处是,线程当前的操作需要加锁时,直接加锁即可,不需要考虑已加锁的代码块中是否又进行了加锁的操作。

2. ReentrantLock的内部结构

内部类

Sync extends AbstractQueuedSynchronizer
NonfairSync extends Sync
FairSync extends Sync

Sync继承了AbstractQueuedSynchronizer,并依据ReentrantLock的语义实现了相关方法,其它两个内部类分别表示公平锁和非公平锁所对应的同步队列,它们主要是在tryAcquire方法和lock方法的实现上采取了不同的策略,以符合公平锁和非公平锁的语义。

重要数据成员

private final Sync sync;

如果构造的是公平锁,sync就引用FairSync的对象,如果构造的是非公平锁sync就是NonfairSync对象的引用。

构造函数

public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

无参数时,默认为非公平锁。有参数时,若参数为false,则为公平锁。

4. AQS的内部结构

锁的获取过程实际上对应了AQS对状态的改变过程。现在我们就要对AQS类进行一个简要的介绍。在本文中做如下规定:未能获得锁的线程会进入队列中排队获取锁,我们称这个队列为等待(锁的)队列。线程调用Condition对象方法的await方法会阻塞,阻塞的线程会进入一个队列中等待其它线程调用signal方法唤醒,我们称这个队列为条件队列。AbstractQueuedSynchronizer简称为AQS。

重要的数据成员

private volatile int state;
private transient volatile Node head;
private transient volatile Node tail;

state:对于锁而言它表明了锁的状态,0表示没有线程占有锁;非0表示已有线程占有锁,非0值表示可重入次数。

未能获取锁的线程就进入队列进行等待,数据成员head和tail 表示等待队列的头和尾。

      内部类Node

static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus; volatile Node prev;
volatile Node next;
volatile Thread thread; Node nextWaiter;
……//其它暂时省略
}

内部类Node表示了等待队列中的节点,Node中的thread是因未获取锁而等待的线程的引用。prev 和next分别指向了前一个节点和后一个节点。有时候在本文中线程和节点是同一个意思。

通过ReentrantLock源代码分析AbstractQueuedSynchronizer独占模式

AQS队列的示意图

头节点:它是一个哑节点,它的下一个节点开始才表示因未能获取锁而处于等待锁的节点。

准确的说队列是由prev引用串接在一起的单向链表,节点中next引用只是一个辅助作用,在大多数情况下可以根据next找到当前节点的下一个节点。

重点要介绍的是waitStatus,它表示节点的状态,它有五种取值

0

新创建的节点、出列的节点、队尾的节点、刚从条件队列中进入等待队列中的节点,都处于这种状态

CANCELLED = 1

表示当前节点表示的线程因超时或者被中断而处于取消的状态。处于取消状态的节点会从队列中移除,并从获取锁的方法中返回(对于可中断获取锁的方法是以抛出异常的方式返回)

SIGNAL = -1

表示当前节点出列时它的下一个节点需要唤醒

CONDITION = -2

表示当前节点位于条件队列中

PROPAGATE = -3

表示共享模式下,若当前节点被唤醒,它的下一个节点也可以被唤醒

nextWaiter 有两个作用,一个是指明了AQS是共享模式还是独占模式(用SHARED    和      EXCLUSIVE两种值来区分);另一个作用是用于条件队列的节点指针。

内部类ConditionObject

public class ConditionObject implements Condition, java.io.Serializable {
……
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
……//其它暂时省略
}

ConditionObject类的内部实际上维护了一个条件队列,firstWaiter和lastWaiter表示了队列的头和尾。一个锁仅有一个等待队列,但可以对应多个条件队列(当然一个节点不能同时位于条件队列和等待队列中,也不能同时位于多个条件队列中)。由于ConditionObject是个AQS的内部类,正好满足多个条件队列对应一个等待队列,这可以看做内部类特性的一个经典应用。

分析ReentrantLock中AQS的工作原理需要把握几点

1. 任何时候都有可能有多个线程来竞争获取锁

2. 任何时候都有可能有多个线程竞争入列

3. 唤醒的线程不一定能立刻运行,可能位于就绪状态

4. 线程随时可以由运行态转变为就绪态

5. 锁负责状态的定义

6. AQS负责队列的维护

5. 非公平不可中断锁的获取

public void lock() {
sync.lock();
}

非公平不可中断锁的获取调用了ReentrantLock类中的lock方法,实际上内部调用了NonfairSync类的lock方法。

final void lock() {
if (compareAndSetState(0, 1)) //快速尝试获取锁
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

NonfairSync类内部的lock方法首先尝试快速获取锁(而不考虑是等待队列是否有中节点还在等待获取锁,这是非公平语义的体现),如果成功直接返回,如果失败则调用了AQS类的acquire方法。

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

tryAcquire的作用是尝试获取锁,获取成功,lock方法返回,线程继续执行;获取失败,调用addWaiter创建一个新的节点到等待队列中。acquireQueued的作用是确保当前线程阻塞后能被唤醒。

注意:这里调用的不是AQS中的tryAcquire,而是调用了被NonfairSync类覆盖的tryAcquire方法。而NonfairSync类中tryAcquire实际上又调用了Sync类中的nonfairTryAcquire方法。

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

对于非公平锁,上述代码才是tryAcquire的核心,我们现在对它进行简要的分析。

1. 判断当前状态(AQS的state字段)是否为0,

1.1若为0,尝试用原子类操作将其置为1(任何时候都有可能有多个线程来竞争获取锁,所以必须使用原子类操作,这里也体现了锁的非公平特性,即未入列就可以尝试获取锁)。

1.1.1成功:说明已获取锁,通过setExclusiveOwnerThread将占有锁的线程标记当前线程,然后返回true,ture说明获取锁成功。

1.1.2 失败:返回false,说明同时有其它线程也来获取锁,并且当前线程获取锁失败。

1.2 若为非0:判断要获取锁的线程是否是当前线程(可重入锁语义的实现)。

1.2.1 是:state = state + 1 (即重入次数加1),然后返回true,ture说明获取锁成功。

1.2.2 否:返回false,说明已有线程占有锁,当前线程获取锁失败了。

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

现在我们再来分析一下addWaiter,它的主要作用就是将等待的线程入列

1. 构造一个节点,节点的nextWaiter的值为EXCLUSIVE(ReentrantLock中都是这个值,它表示了独占模式,即同时只能有一个线程拥有锁)

2. 判断队列的尾节点是否为空

2.1 否:通过原子操作入列,注意pred.next = node 不是原子操作不能保证入列后立刻被执行(也就是说如果一个节点的next值为null不能说明它后面没有节点,next不为null说明它一定有后继节点)。然后返回新构造的节点的引用,程序结束。

3. 调用enq方法。

4. 返回新构造的节点的引用

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

继续分析enq方法。

1. 判断当前队列的尾部是否为null,这里再次判断的目的就是考虑到多个线程会可能会先后执行enq这段程序,只有一个线程新创建的节点能作为尾节点,其它线程建立的节点都会被垃圾回收线程回收。

1.1 是:创建一个节点,尝试通过原子操作将其作为队尾(此时它也是队列的头部)。

1.1.1 成功:说明当前线程为队列设置了尾节点,回到步骤1

1.1.2 失败:说明其它线程已为队列设置了尾节点,回到步骤1

1.2 否:尝试入列

1.2.1 成功:结束

1.2.2 失败:说明有其它线程也在入列,发生了碰撞,并且当前线程竞争失败了。回到步骤1,再来一次循环。

以上就是addWaiter的代码分析,现在我们再来分析一下acquireQueued方法的代码分析。

acquireQueued的主要作用是确认是否要阻塞当前线程,如果要阻塞,该方法要确保前一个节点出列时,当前线程能被唤醒。

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); // 不可中断锁中永远不会被执行
}
}

如果入列后发现当前节点的上一个节点是头节点就会调用setHead(node),它的作用是当节点成功获取锁以后,将当前作为头节点,上一个头节点会出列。注意头节点的更改是在成功获取锁之后,而不是在释放锁的时候。这样做的目的是考虑到非公平锁状态下,当前节点会和未入列的节点(调用nonfairTryAcquire方法中的compareAndSetState(0, acquires)语句)竞争获取锁,当未入列的节点获取到锁时,队列的头节点应该保持不变。

p == head && tryAcquire(arg) 的作用:如果当前节点的上一个节点是头节点,当前线程要再次尝试获取锁。这样的原因,就是保证唤醒过程不会出现死链的情况。为了解释的清楚一下,我现在不得不把unlock的核心代码release方法的实现提前放出来。

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

从释放锁的代码可以看出,如果头节点的waitStatus为0时,它不会唤醒头节点之后的下一个节点。

我们现在可以举个例子,假设这样一种情况,线程a已占有锁,b,c,d线程因锁已被占有而竞争入列,a在成功释放锁后,即tryRelease(arg)返回true时,b,c,d线程还没有完成头节点的设置(原谅这三个肉肉的线程,由于调度的原因,他们动作比较慢)。这时a线程以为没有线程需要它唤醒,即unparkSuccessor(h)不会执行,它就拍拍屁股走了。此时三个线程才入列完毕(head<-b<-c<-d),注意这个时候其实锁时空闲的,如果这个时候b把自己阻塞了,那有整个等待队列中的节点都不会被唤醒。所以,某个节点发现自己的上一个节点是头节点时,还要再次尝试获取锁,如果失败,要确定头节点的waitStatus必须为SIGNAL然后再次获取锁(这是为了防止将头节点的waitStatus设置为SIGNAL之前,锁恰好被释放),又失败了才能安心的阻塞自己(调用parkAndCheckInterrupt方法中的LockSupport.park(this)阻塞自己)。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {

/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

shouldParkAfterFailedAcquire的作用:当前线程就要把自己阻塞了,在阻塞之前务必要保证前一个节点成功释放锁后,会把当前线程唤醒。不可中断锁中节点的状态不会有waitStatus>0的情况,因此我们将不会执行的代码删除。当shouldParkAfterFailedAcquire返回true时,说明可以放心阻塞当前线程了,这时就会调用parkAndCheckInterrupt()方法来阻塞当前线程。当该节点被唤醒时,会继续从parkAndCheckInterrupt()方法中的下一条语句继续执行。

获取锁过程的几点说明:

(1) 如果在没有其它线程占有锁的情况下成功获取锁,则该线程不会进入队列

(2) 节点的出列(也就是头节点的改变)是在成功获取锁之后,而不是释放锁的时候

(3) cancelAcquire(node)在不可中断锁中不会执行

6. 可中断锁的获取

可中断锁响应中断只有两个时刻,一个是未入列之前调用acquireInterruptibly方法时,另一个是被唤醒后从parkAndCheckInterrupt方法中返回的时候。如果线程调用lockInterruptibly抛出异常,线程就会从lockInterruptibly方法中返回,并捕获异常。

lockInterruptibly调用了AQS的acquireInterruptibly,它实际上又调用了doAcquireInterruptibly

private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

doAcquireInterruptibly方法的代码基本和acquireQueued类似,区别就在于当线程检测到自己的中断标志位被设置后(在parkAndCheckInterrupt方法中实现)会抛出异常 InterruptedException,使得finally块中的cancelAcquire方法得到执行。

private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;

node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {

unparkSuccessor(node);
}
node.next = node; // help GC
}
}

代码的基本思路就是,从这个队列中删头节点的下一个节点(也就是当前线程自身对应的节点),并唤醒下下一个节点,因为线程一旦入列并处于等待状态,只有被唤醒以后才能响应中断,而被唤醒的前提时节点是必须位于头节点的下一个节点(前面的节点都以陆续出列)。

cancelAcquire方法是为了可中断锁和超时锁的取消操作共同设计的,可中断锁中没有用到的代码都以横线的方式删除了。

7. 超时可中断锁的获取

private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

deadline表示了截止时间,nanoTimeout表示了尝试获取锁剩的余时间,当nanoTimeout大于一个阀值时(这主要是考虑到park操作和unpark操作所耗费的时间,该值由spinForTimeoutThreshold表示),当前线程才会调用LockSupport.parkNanos(this, nanosTimeout)将自己阻(阻塞的时间为nanosTimeout),否则继续执行续循环体。

可以看出获取锁给定的时间在执行的时候不是一个精确的时间,实际上很可能会大于获取锁所规定的时间。不是一个精确的时间有两方面的原因,一个是由于系统的调度使得线程由运行态转为就绪态,而处于就绪态的时间不固定;另一个是执行park和unpark方法的代码需要的时间无法精确给出。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
…………
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);

pred.next = node;
}
…………
}

此时在不可中断锁中shouldParkAfterFailedAcquire被划掉的代码才会实现

private void cancelAcquire(Node node) {
…………
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
…………
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
…………
}
}

基本思想就是跳过那些处于取消状态的节点,如果取消的节点是头节点的下一个节点,则将下下个节点唤醒。与可中断锁不同,这个时候的才可能有多个线程同时执行cancleAcquire方法,划掉的代码才可能会被执行。

8.锁释放代码的分析

public void unlock() {
sync.release(1);
}

释放锁的代码实际上调用了AQS的release方法

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

代码的主要功能:1. tryRelease释放锁     2. unparkSuccessor唤醒头节点的下一个节点

上面代码中最重要就是tryRelease方法。

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

这里调用的是Sync类中的的tryRelease方法。对于可重入锁,每执行一次unlock方法,可重入次数减1,当state的值为0时,才真正释放了锁。

private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); /*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

unparkSuccessor传入的参数是头结点,

当头节点的next值为null时,不能说明队列为空,而要从后往前寻找需要唤醒的节点。我们将当前节点称为A,A的前一个节点称为B。当节点A入列时,无法对当前节点的prev和前一个节点B的next值同时进行原子操作。有可能正要对B节点的next引用赋值时,线程发生了调度,导致前B节点的next值为null,但实际上A节点已入列,并且位于B节点的后面。

被唤醒的线程从parkAndCheckInterrupt()继续执行。

9. Condition的await方法和signal方法源码分析

下面的代码表示阻塞前await方法中执行的代码。

public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;

}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);

}

注意,使用await方法前必须要获取锁。我们现在await方法阻塞当前线程之前的代码,唤醒后执行的代码我们用删除线划掉。

如果在线程在调用await方法之前就被中断了,那么await会直接抛出异常,此时当前线程没有执行释放锁的状态。

一个线程调用await方法,说明这个线程必定处于运行(或就绪状)态,处于运行态(或就绪态),那么它必定不会在等待队列中,也没有与之对应节点。所以addConditionWaiter内部是新建一个节点,并加入到条件队列中。

fullyRelease 方法的作用是释放锁,对于可重入锁,释放的过程是将AQS的status值(可重入次数)存储在savedState变量中,然后通过原子类操作将status的值直接更改为0。而不是像unlock方法那样依次递减为0。

isOnSyncQueue 判断是否在等待队列中,显然此时不在等待队列中(而位于条件队列中),至于为什么要进行这个判断,还没有彻底理解。

最后线程调用LockSupport.park(this)将自己阻塞。

private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

当其它线程调用这个条件队列的signal方法时(调用signal方法时前必须获取锁),条件队列将头节点从条件队列中取出,然后加入到等待队列中。signal方法主要调用了dosignal方法。

final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false; /*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

加入到等待队列主要由transferForSignal方法实现,首先将节点的状态由CONDITION修改为0,然后调用enq方法入列(注意enq方法返回的是当前节点入列后的前一个节点),如果等待队列中已入列的前一个节点的处于取消状态,要将已入列的节点唤醒(这是为了防止死链);否则就要等到等待队列中该节点之前的已入列的节点依次出列(或者被取消)才能被唤醒。被唤醒后继续回到await中执行(被唤醒的前提是位于条件队列的队首)。

下面的代码表示唤醒后await方法中执行的代码。

public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;

while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

检查位于条件队列中的时候是否被中断,如果是,跳出循环。否则判断循环条件isOnSyncQueue(node),显然,singal方法再唤醒前已将队首节点加入到等待队列中,所以不满足循环条件。执行acquireQueued(node, savedState),这个方法前面分析过,如果获取锁失败,则线程会在这里被阻塞,直到该节点成为了头节点的下一个节点。不同的是若果获取锁成功,这里是将status的值直接更新为可重入次数(就是前面保存的savedState的值),然后从await方法中返回,而不是像重入加锁操作那样每次累加1。

参考博客

[1] http://ifeve.com/jdk1-8-abstractqueuedsynchronizer/

[2] http://www.tuicool.com/articles/RJ3Eza2

[3] http://www.ibm.com/developerworks/cn/java/j-jtp10264/index.html

上一篇:MJViewController的view的创建


下一篇:.NET跨平台之旅:将示例站点升级至 ASP.NET Core 1.1