并发编程之 AQS 源码剖析

并发编程之 AQS 源码剖析

前言

JDK 1.5 的 java.util.concurrent.locks 包中都是锁,其中有一个抽象类 AbstractQueuedSynchronizer (抽象队列同步器),也就是 AQS, 我们今天就来看看该类。

1.结构

并发编程之 AQS 源码剖析

我们看看该类的结构,该类被 CountDown,ThreadPoolExecutor,ReentrantLock,ReentrantReadWriteLock,Semaphore 的内部类所继承,而这些内部类都是这些锁的真正实现,不论是公平锁还是非公平锁。

也就是说,这些锁的真正实现都是该类来实现的。那么,我们就从这些锁开始看看是如何实现从锁到解锁的。

2. 重入锁的 lock 方法

我们先看看重入锁 ReentranLock 的 lock 方法。

    public void lock() {
sync.lock();
}

该方法调用了内部类的 sync 抽象类的 lock 方法,该方法的实现有公平锁和非公平锁。我们看看公平锁是如何实现的:

    static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L; final void lock() {
acquire(1);
}

调用了 acquire 方法,该方法就是 AQS 的的方法,因为 sync 继承了 AQS,而公平锁继承了 Sync,等于间接继承了 AQS,我们看看该方法。

    public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

该方法JDK注释 :

以独占模式获取对象,如果被中断则中止。通过先检查中断状态,然后至少调用一次 tryAcquire(int) 来实现此方法,并在成功时返回。否则在成功之前,或者线程被中断之前,一直调用 tryAcquire(int) 将线程加入队列,线程可能重复被阻塞或不被阻塞。可以使用此方法来实现 Lock.lockInterruptibly() 方法。

楼主来简单说一下该方法的作用:该方法会试图获取锁,如果获取不到,就会被加入等待队列等待被唤醒,这个其实和我们之前分析的 synchronized 是差不多的。

我们仔细看看该方法,首先是 tryAcquire 方法,也就是尝试获取锁,该方法是需要被写的,父类默认的方法是抛出异常。如何重写呢?抽象类定义一个标准:如果返回 true,表示获取锁成功,反之失败。

并发编程之 AQS 源码剖析

我们回到 acquire 方法,如果获取锁成功,就直接返回了,如果失败了,则继续后面的操作,也就是将线程放入等待队列中:

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

我们先看看 addWaiter(Node.EXCLUSIVE) 方法:

    /**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

该方法注释:将当前线程放入到队列节点。参数呢?参数有2种,Node.EXCLUSIVE 是独占锁,Node.SHARED 是分享锁。

在 Node 类种定义了这两个常量:

    static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

独占锁是null,共享锁是空对象。

我们看看该方法的步骤:

  1. 创建一个当前线程的 Node 对象(nextWaiter 属性为 null, thread 属性为 当前线程)。
  2. 获取到末端节点,如果末端节点不为 null,则将末端节点设置为刚刚创建的节点的 prev 属性。

    2.1. 通过 CAS 设置末端节点为新的节点。如果成功,将刚刚创建的节点设置为老末端节点的next节点。最后返回。
  3. 如果 tail 末端节点是null,则调用enq 方法。创建一个末端节点,然后,将刚刚创建的末端节点设置为新节点的 prev 属性(此时的末端节点就是 head 头节点)。最后返回刚刚创建的 node 节点。

我们看看 enq 方法的实现:

    private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

该方法步骤如下:

  1. 死循环,获取到末端节点,如果是null,则使用CAS创建一个头节点(头节点此时也是null),并将头节点赋值末端节点。
  2. 由于刚刚CAS 成功,走else 逻辑,将末端节点赋值给新节点的 prev 属性,使用CAS设置新的末端节点为刚刚创建的 node对象。然后返回node 对象。

该方法主要就是初始化头节点和末端节点,并将新的节点追加到末端节点并更新末端节点。

我们会到 addWaiter 方法中,该方法主要作用就是根据当前线程创建一个 node 对象,并追加到队列的末端。

我们再回到 acquire 方法:

    public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

addWaiter 方法会返回刚刚创建的node 对象,然后调用 acquireQueued 方法,我们进入该方法查看:

    final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

该方法步骤如下:

  1. 死循环。先获取 node 对象 prev 节点,如果该节点和 head 相等,说明是他是第二个节点,那么此时就可以尝试获取锁了。

    1.1 如果获取锁成功,就设置当前节点为 head 节点(同时设置当前node的线程为null,prev为null),并设置他的 prev 节点的 next 节点为 null(帮助GC回收)。最后,返回等待过程中是否中断的布尔值。
  2. 如果上面的两个条件不成立,则调用 shouldParkAfterFailedAcquire 方法和 parkAndCheckInterrupt 方法。这两个方法的目的就是将当前线程挂起。然后等待被唤醒或者被中断。稍后,我们仔细查看这两个方法。
  3. 如果挂起后被当前线程唤醒,则再度循环,判断是该节点的 prev 节点是否是 head,一般来讲,当你被唤醒,说明你别准许去拿锁了,也就是 head 节点完成了任务释放了锁。然后重复步骤 1。最后返回。

我们看看 shouldParkAfterFailedAcquire 方法:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) return true;
if (ws > 0) { do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

该方法步骤如下:

  1. 获取去上一个节点的等待状态,如果状态是 SIGNAL -1,就直接返回 true,表示可以挂起并休息。
  2. 如果 waitStatus 大于 0, 则循环检查 prev 节点的 prev 的waitStatus,知道遇到一个状态不大于0。该字段有4个状态,分别是 CANCELLED = 1,SIGNAL = -1, CONDITION = -2, PROPAGATE = -3,也就是说,如果大于 0,就是取消状态。那么,往上找到那个不大于0的节点后怎么办?将当前节点指向 那个节点的 next 节点,也就是说,那些大于0 状态的节点都失效这里,随时会被GC回收。
  3. 如果不大于0 也不是 -1,则将上一个节点的状态设置为有效, 也就是 -1.最后返回 false。注意,在acquireQueued 方法中,返回 false 后会继续循环,此时 pred 节点已经是 -1 了,因此最终会返回 true。

再看 parkAndCheckInterrupt 方法(挂起并检查是否中断):

   private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

该方法非常的简单,就是将当前线程挂起,等到有别的线程唤醒(通常是 head 节点中线程),然后返回当前线程是否是被中断了,注意,该方法会清除中断状态。

回到 acquireQueued 方法,总结一下该方法,该方法就是将刚刚创建的线程节点挂起,然后等待唤醒,如果被唤醒了,则将自己设置为 head 节点。最后,返回是否被中断。

再回到 acquire 方法:

    public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

在该方法中,如果获取锁失败并被唤醒,且被中断了,那么就执行 selfInterrupt 方法:

    static void selfInterrupt() {
Thread.currentThread().interrupt();
}

将当前线程设置中断状态位。

好了,到这里,整个lock 方法,我们基本就分析完了,可以说,整个方法就是将线程放入到等待队列并挂起然后等待 head 节点唤醒。其中,tryAcquire 方法高频出现,该方法具体实现由子类实现,比如 重入锁,读写锁,线程池的 worker,其中 CountDown 和 Semaphore 实现的是共享模式的 tryAcquire 方法,但原理相同。AQS 如何定义的?就是返回 true 表示拿到锁了,返回 false 表示拿锁失败,具体如何实现AQS管不了。但他们都依赖一个极其重要的字段 ------- state。

楼主有必要说说这个字段,该字段定义了当前同步器的状态,如果大家知道 pv 原语的话,应该很好理解这个字段,该字段在 AQS 中是如何定义的:

    /**
* The synchronization state.
*/
private volatile int state;

volatile。该字段可能会被多个线程修改,因此,需要设置为 volatile ,保证变量的可见性。

我们可以看看 重入锁中的公平锁是如何使用该字段的。

        /**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

该方法重写了 tryAcquire 方法,步骤如下:

  1. 获取当前线程,获取锁(同步器)的状态。
  2. 如果同步器等于0,就 CAS 设置 state 为 1,表示同步器被占用了,并且设置同步器的持有线程为当前线程(为了判断重入)。最后返回拿锁成功 true。
  3. 如果不是0,并且当前线程就是同步器的持有线程,说明是重入。那么就将 state 加1,最后返回 true。所以说,当你重入一次,就需要解锁一次,否则下个线程永远拿不到锁。
  4. 如果都不是,返回 false ,表示拿锁失败。

从这里,我们可以看到, statei 字段非常的重要,判断锁是否被持有完全根据这个字段来的。这点一定要注意,而这个设计和操作系统的 pv 由异曲同工之妙。

那么看完了拿锁,再看看解锁,我们可以先猜想一下如何设计,首先肯定是要将 state 字段设置为 0,才能让下个线程拿锁,然后呢?唤醒等待队列中的下个线程。让他尝试拿锁。那到底 doug lea 是不是这么设计的呢?我们来看看。

3. 重入锁的 unlock 方法

该方法调用了AQS 的 release 方法:

    public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

首先尝试释放,如果成功,则唤醒下一个线程。

我们先看看 tryRelease 方法 (需要重写):

        protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

该方法步骤如下:

  1. 计算同步器状态减去1后的值。
  2. 判断同步器线程和当前线程是否相同,如果不同,抛出监视器状态异常。
  3. 判断状态是否是 0,也就是说,如果是0,表示没有线程持有锁了,那么就是设置 free 为 true,并且设置同步器的 thread 属性为null,
  4. 最后设置 state 为 计算的值,这里需要考虑重入。最后返回。

可以看到,如果 state 不是 0 的话,就会返回 false ,后面的步骤就没有了,也就是说,重入锁解锁的时候不会唤醒下一个线程。

如果解锁成功,执行下面的步骤,如果 head 头节点不是 null 并且他的状态不是0,说明有线程可以唤醒,执行 unparkSuccessor 方法。

    private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

该方法步骤如下:

  1. 获取到头节点的状态。
  2. 如果小于0,CAS 设置状态为0。
  3. 获取到头节点的next 节点,判断是否为null,或者 next 节点是否大于0,如果是null 或者大于0,则从末端节点开始向上查找,直到找到状态小于等于0 的节点。
  4. 最后唤醒该节点的线程。

这个时候,等待在 acquireQueued 方法中,准确的说是 parkAndCheckInterrupt 方法中的 线程被唤醒,开始继续循环,尝试拿锁(需要修改 state 变量),并设置自己为 head。

这里还有一个漏掉的地方,就是 waitStatus 变量,什么时候会大于等于0? 该变量默认是 0,大于 0 的状态是被取消的状态。什么时候会被取消呢? 在acquireQueued 方法中,如果方法没有正常结束,则会执行 finally 中的 cancelAcquire 方法,该方法会将状态变成 1,也就是取消状态。

4 总结

这次我们分析 AQS,也就是锁的的真正实现,只分析了 lock 方法和 unlock 方法,这两个方法是重入锁的基础。CountDown 和 Semaphore 是共享锁,但是基本原理相同,只是将 state 的数字加大便可以实现。而和重入锁等锁相关联的 Condition 则是通过 LockSupport 工具类直接挂起当前线程,并将当前线程添加到等待队列中,当调用 Condition 的 signal 方法时,则唤醒队列中的第一个线程。具体源码我们有机会再分析。

总之,java 重入锁的实现基于 AQS,而 AQS 主要基于 state 变量和队列来实现。实现原理和 pv原语 类似。

good luck!!!!!

上一篇:C#【类库】项目手工转成【ASP.NET Web 应用程序】项目


下一篇:多线程高并发编程(7) -- Future源码分析