聊聊高并发(二十四)解析java.util.concurrent各个组件(六) 深入理解AQS(四)

近期总体过了下AQS的结构。也在网上看了一些讲AQS的文章,大部分的文章都是泛泛而谈。又一次看了下AQS的代码,把一些新的要点拿出来说一说。

AQS是一个管程。提供了一个主要的同步器的能力,包括了一个状态,改动状态的原子操作。以及同步线程的一系列操作。它是CLHLock的变种,CLHLock是一个基于队列锁的自旋锁算法。

AQS也採用了队列来作为同步线程的结构。它维护了两个队列。一个是作为线程同步的同步队列,还有一个是基于Unsafe来进行堵塞/唤醒操作的条件队列。

所以理解队列操作是理解AQS的关键。

1. 理解 head, tail引用

2. 理解 next, prev引用

3. 理解队列节点何时入队,何时出队

关于head引用,须要记住的是

1. head引用始终指向获得了锁的节点,它不会被取消

acquire操作成功就表示获得了锁,acquire过程中假设中断,那么acquire就失败了,这时候head就会指向下一个节点。

* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.

而获得了锁的之后,假设线程中断了,那么就需要release来释放head节点。

假设线程中断了不释放锁,就有可能造成问题。所以使用显式锁时。必需要在finally里面释放锁

Lock lock = new ReentrantLock();
lock.lock();
try{
// 假设中断,能够处理获得抛出,要保证在finally里面释放锁
}finally{
lock.unlock();
}

再来看看获得锁时对head引用的处理,仅仅有节点的前驱节点是head时,它才有可能获得锁,而获得锁之后,要把自己设置为head节点,同一时候把老的head的next设置为null。

这里有几层含义:

1. 始终从head节点開始获得锁

2. 新的线程获得锁之后,之前获得锁的节点从队列中出队

3. 一旦获得了锁,acquire方法肯定返回,这个过程中不会被中断

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

关于tail引用。它负责无锁地实现一个链式结构。採用CAS + 轮询的方式。

节点的入队操作都是在tail节点

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

next引用在队列中扮演了非常关键的数据。它出现的频率非常高。关于next引用。它有几种值的情况

1. next = null

2. next指向非null的下一个节点

3. next = 节点自己

next = null的情况有三种

1. 队尾节点,队尾节点的next没有显式地设置。所以为null

2. 队尾节点入队列时的上一个队尾节点next节点有可能为null,由于enq不是原子操作,CAS之后是复合操作

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
// 这个期间next可能为null
 t.next = node;
return t;
}
}
}
}

3. 获取锁时,之前获取锁的节点的next设置为null

if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}

next指向非null的下一个节点,这样的情况就是正常的在同步队列中等待的节点,入队操作时设置了前一个节点的next值,这样能够在释放锁时,通知下一个节点来获取锁

 private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

next指向自己,这个是取消操作时,会把节点的前一个节点指向它的后一个节点,最后把next域设置为自己

private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;         node.thread = null;         // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;         // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;         // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;         // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }             node.next = node; // help GC
        }
    }

prev引用比較简单,它主要是维护链表结构。CLHLock是在前一个节点的状态自旋,AQS里面的节点不是在前一个状态等待,而是释放的时候由前一个节点通知队列来查找下一个要被唤醒的节点。

最后说说节点进入队列和出队列的情况。

节点入队列仅仅有一种情况。那就是它的tryAcquire操作失败,没有获得锁,就进入同步队列等待,假设tryAcquire成功了,就不须要进入同步队列等待了。AQS提供了充分的灵活性。它提供了tryAcquire和tryRelase方法给子类扩展。基类负责维护队列操作,子类能够自己决定是否要进入队列。

所以实际子类扩展的时候有两种类型,一种是公平的同步器,一种是非公平的同步器。这里须要注意的是,所谓的非公平,不是说不使用队列来维护堵塞操作,而是说在获取竞争时,不考虑先来的线程,后来的线程能够直接竞争资源。非公平和公平的同步器竞争失败后,都须要进入AQS的同步队列进行等待,而同步队列是先来先服务的公平的队列。

static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) {
super(permits);
} protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
} /**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) {
super(permits);
} protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

出队列有两种情况。

1. 后一个线程获得锁是。head引用指向当前获得锁的线程。前一个获得锁的节点自己主动出队列

2. 取消操作时。节点出队列,取消仅仅有两种情况,一种是线程被中断,另一种是等待超时

上一篇:聊聊高并发(四十四)解析java.util.concurrent各个组件(二十) Executors工厂类


下一篇:timer计算两个方法执行时间