AQS框架源码分析-AbstractQueuedSynchronizer

前言:AQS框架在J.U.C中的地位不言而喻,可以说没有AQS就没有J.U.C包,可见其重要性,因此有必要对其原理进行详细深入的理解。

1.AQS是什么

在深入AQS之前,首先我们要搞清楚什么是AQS。AQS全称是AbstractQueuedSynchronizer,我们直接查看AQS源码的注释。

AQS框架源码分析-AbstractQueuedSynchronizer

大致意思就是说:AQS提供了实现阻塞锁和相关同步器并依赖先进先出(FIFO)等待队列的框架。

AQS框架源码分析-AbstractQueuedSynchronizer

AQS依赖一个原子数值作为锁的状态,子类可以有多个状态值,只能通过原子方法区操作该值,从而保证同步。

通过第一段的注释大致总结下AQS是什么:

①AQS是一个同步的基础框架,基于一个先进先出的队列

②锁机制依赖一个原子值的状态。

③AQS的子类负责定义与操作这个状态值,但必须通过AQS提供的原子操作。

④AQS剩余的方法就是围绕队列,与线程阻塞唤醒等功能。

2.重要成员变量

AQS中有两个重要的成员变量:Node和ConditionObject。

AQS框架源码分析-AbstractQueuedSynchronizer

①Node的作用是存储获取锁失败的线程,并且维护一个CLH FIFO队列,该队列是会被多线程操作的,所以Node中大部分变量都是被volatile修饰,并且通过自旋和CAS进行原子性的操作。CLH的数据结构如下:

AQS框架源码分析-AbstractQueuedSynchronizer

Node有一个模式的属性:独占模式共享模式,独占模式下资源是线程独占的,共享模式下,资源是可以被多个线程占用的。

Node源码如下:

 static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node(); // 共享模式
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null; // 独占模式 /** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1; // 表明线程已处于结束状态(被取消)
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1; // 表明线程需要被唤醒
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2; // 表明线程正处于条件队列上,等待某一条件
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3; // 共享模式下同步状态会被传播 /**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus; /**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev; /**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next; /**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread; /**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter; /**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
} /**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
} Node() { // Used to establish initial head or SHARED marker
}
// 线程加入等待结点
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 线程加入条件对列,会带上线程的状态值waitStatus
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

②ConditionObject:条件队列,这个类的作用从AQS的注释上可知。

AQS框架源码分析-AbstractQueuedSynchronizer

该类主要是为了让子类实现独占模式。AQS框架下独占模式的获取资源、释放等操作到最后都是基于这个类实现的。只有在独占模式下才会去使用该类。

ConditionObject源码如下(对主要代码进行了注释):

 public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter; // 存储条件对列中第一个节点
/** Last node of condition queue. */
private transient Node lastWaiter; // 存储条件对列中最后一个节点 /**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { } // Internal methods /**
* Adds a new waiter to wait queue. // 增加一个新的节点到等待队列中
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果最后一个节点的状态已经结束,则直接清理掉
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
// 拆分已经处于结束状态的节点 也就是清除掉这类节点
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建一个新的节点,带上结点状态,表明结点处于条件对列上
Node node = new Node(Thread.currentThread(), Node.CONDITION);
/**
条件队列中加入节点都是从队尾加入,并且从下面代码可知,每次都会存储最后一个节点的值。
当最后一个节点为空时,说明队列中不存在节点,所以将node赋值给第一个节点,否则将节点加入对列尾
*/
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node; // 存储最后一个节点的值
return node;
} /**
* 唤醒节点
* 移除和转换节点直到节点状态处于未结束或者为空 (节点移除相当于唤醒)
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
// 当next节点为null时,则将lastWaiter赋值为null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null; // 切断当前节点
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
} /**
* 唤醒所有节点
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
// 循环唤醒所有节点,代码还是比较容易理解
// 将每个节点直接截断即可
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
} /**
* Unlinks cancelled waiter nodes from condition queue.
* Called only while holding lock. This is called when
* cancellation occurred during condition wait, and upon
* insertion of a new waiter when lastWaiter is seen to have
* been cancelled. This method is needed to avoid garbage
* retention in the absence of signals. So even though it may
* require a full traversal, it comes into play only when
* timeouts or cancellations occur in the absence of
* signals. It traverses all nodes rather than stopping at a
* particular target to unlink all pointers to garbage nodes
* without requiring many re-traversals during cancellation
* storms.
*/
private void unlinkCancelledWaiters() { // 删除处于结束状态的节点
Node t = firstWaiter;
Node trail = null;
// 第一个节点为空,直接返回
// 这里会遍历所有节点
while (t != null) {
Node next = t.nextWaiter; // 记录下一个节点的值
// 当节点状态不为CONDITION
if (t.waitStatus != Node.CONDITION) {
// 首先将当前节点的下一个节点赋值为空,切断当前节点链路
t.nextWaiter = null;
// 如果追踪节点为空的时候,则存储第一个节点的值为next,因为当前节点状态不为CONDITION需要清理
if (trail == null)
firstWaiter = next;
else // 在追踪节点串联下一个节点,主要是为了存储最后一个节点的值
trail.nextWaiter = next;
if (next == null) // 当next为空时,则存储trail为最后一个节点,将最后一个节点值存储下来
lastWaiter = trail;
}
else // 当节点状态为CONDITION时,将该节点赋值给trail
trail = t;
t = next; // 将next赋值给t,继续遍历
}
} // public methods /**
* 唤醒等待时间最长的节点,使其拥有锁
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
// 如果线程不是独占资源,则抛出异常,从这里也说明ConditionObject只能用在独占模式中
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
} /**
* 唤醒所有等待节点
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
} /**
* 节点不间断等待
* Implements uninterruptible condition wait.
* <ol>
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* </ol>
*/
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
} /*
* For interruptible waits, we need to track whether to throw
* InterruptedException, if interrupted while blocked on
* condition, versus reinterrupt current thread, if
* interrupted while blocked waiting to re-acquire.
*/ /** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1; /**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
} /**
* Throws InterruptedException, reinterrupts current thread, or
* does nothing, depending on mode.
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
} /**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
} /**
* Implements timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
} /**
* Implements absolute timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* <li> If timed out while blocked in step 4, return false, else true.
* </ol>
*/
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
} /**
* Implements timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* <li> If timed out while blocked in step 4, return false, else true.
* </ol>
*/
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
} // support for instrumentation /**
* Returns true if this condition was created by the given
* synchronization object.
*
* @return {@code true} if owned
*/
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
} /**
* Queries whether any threads are waiting on this condition.
* Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
*
* @return {@code true} if there are any waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
} /**
* Returns an estimate of the number of threads waiting on
* this condition.
* Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
*
* @return the estimated number of waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
} /**
* Returns a collection containing those threads that may be
* waiting on this Condition.
* Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
*
* @return the collection of threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}

3.AQS成员函数

由于AQS分独占模式和共享模式,因此这里按独占、共享模式的顺序对AQS的成员函数进行分析。

①acquire(int arg)

独占模式下获取资源,如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,整个过程忽略中断。源码如下:

  /**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

该函数执行流程:

A.如果tryAcquire()成功获取资源,则直接返回。

B.直接获取资源失败,则通过addWaiter()将线程加入队列尾,并标记为独占模式。

C.通过acquireQueued()让线程在等待队列中获取资源,通过自旋方式,一直获取到后才返回。如果在等待过程中被中断过,则返回true,否则返回false。

D.如果线程在等待获取资源的过程中被中断,只有在获取到资源后才会去响应,执行selfInterrupt进行自我中断。

#1.tryAcquire(int)

该方法是在独占模式下获取资源,成功-ture,失败-false。

  protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

直接调用该方法会抛出异常,因为AQS只是一个框架,只是定义该接口,具体实现需在子类中实现。

#2.addWaiter(Node mode)

将当前线程加入等待队列的队尾,并返回当前线程所在的节点。

 private Node addWaiter(Node mode) {
// 创建节点,以独占模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 尝试将节点快速放入队尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 主要通过CAS入队尾
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果快速入队尾失败,则通过enq方式入对尾
enq(node);
return node;
}

CAS操作后面讨论,这里先看enq(final Node node)入队尾操作。

 private Node enq(final Node node) {
// 这里是CAS的“自旋”操作,直到将节点成功加入队尾
for (;;) {
Node t = tail;
// 因为每次入队都是从队尾加入,当队尾为null,则表明队列为null,则需初始化头结点
// 并将尾节点也指向头节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else { // 通过CAS入队尾,自旋操作
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

在线程入队尾后,就需要acquireQueued函数了,该函数的作用是让线程拿到资源,当然还是通过自旋的方式来拿资源,也是就是一个排队的过程。

 final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // 标记是否成功拿到资源
try {
boolean interrupted = false; // 标记在等待过程中是否被中断过
// 自旋操作
for (;;) {
final Node p = node.predecessor(); // 拿到当前节点的前向节点
// 如果前向节点为head,则表明当前节点排在第二位了,已经得到获取资源的资格
if (p == head && tryAcquire(arg)) {
// 成功拿到资源后,将head节点指向当前节点
// 从这里可以看出,head节点就是当前获取到锁的节点
setHead(node);
// 将原来head节点的next设置为null,方便GC回收以前的head节点,也就意味着之前拿到锁的节点出队列了
p.next = null; // help GC
failed = false;
return interrupted; // 返回在排队过程中线程是否被中断过
}
// 到这里,表明线程处于等待状态,自旋直到被unpark
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed) // 获取资源失败,则将节点标记为结束状态
cancelAcquire(node);
}
}

在线程排队等待的过程中,有两个关键函数shouldParkAfterFailedAcquire(Node pred, Node node)和parkAndCheckInterrupt()。

 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 前驱节点的状态
if (ws == Node.SIGNAL)
// 如果前驱节点正处于被唤醒的状态,则正常排队等待即可
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { // 前驱节点处于结束状态
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
/*
*继续向下找,一直找到处于正常等待状态的节点,将当前节点插入其后,其他
*无用节点形成一个链,会被GC
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 前驱节点状态正常,则把前驱节点的状态设置为SIGNAL,这样前驱节点拿到资源后,可通知下当前节点
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

分析以上源码可知:只有当前驱节点的状态为SIGNAL时,当前节点才能正常排队等待,否则需找到一个合适的节点next位置来进行排队等待。

   private final boolean parkAndCheckInterrupt() {
// 使线程进入waitting状态
LockSupport.park(this);
return Thread.interrupted(); // 返回线程是否被中断过
}

该函数作用:当节点正常进入排队后,让线程进入等待状态。

至此acquireQueued()函数总结完成,该函数的具体执行流程:

#1.首先检查节点是否可以立即获取资源。

#2.如果不能立即获取资源,则进行排队,这里需要找到正确的排队点,直到unpark或interrupt唤醒自己。

#3.唤醒后,判断自己是否有资格获取资源,如果拿到资源,则将head指向当前节点,并返回在等待过程是否被中断过,如果没拿到资源,则继续流程2。

acquire小结

到这里acquire(int)函数分析结束,这个函数非常重要,这里再贴上源码:

   public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

#1.调用子类的tryAcquire直接获取资源,如果成功则返回。

#2.如果流程1失败,则将线程加入等待队列的队尾(独占模式)。

#3.在acquireQueued中排队,通过自旋获取资源,直到获取资源才返回。如果在排队过程中线程被中断过返回true,否则返回false。

#4.在排队过程中被中断是不响应的,只有获取到资源后,才进行自我中断,补上中断标记。

整个过程的流程图如下:

AQS框架源码分析-AbstractQueuedSynchronizer

②release(int)独占模式释放资源。

  public final boolean release(int arg) {
// 尝试释放资源
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒队列中下一个线程
return true;
}
return false;
}

释放锁的函数很简单,通过tryRelease尝试释放资源,然后唤醒队列中的其他线程。

tryRelease(int):

    protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

与tryAcquire函数一样,该方法需要子类去实现,如果直接调用会抛异常。

unparkSuccessor(Node node):

唤醒等待队列中的下一个线程,这里唤醒的是等待队列中最前边那个未放弃的线程,注意看代码注释。

  private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus; // 获取当前线程的状态
if (ws < 0) // 如果当前线程状态处于可用状态,则直接将状态值置0
compareAndSetWaitStatus(node, ws, 0); /*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next; // 下一个节点
if (s == null || s.waitStatus > 0) { // 如果节点为null或节点已处于结束状态
s = null;
// 从队列尾向前遍历,找到next可用的节点,状态小于0就可用,这里的节点是队列中最前边的可用节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);// 唤醒next线程
}

独占模式的主要函数分析完毕,接下来看共享模式。

②acquireShared(int)

共享模式下获取资源,如果成功则直接返回,否则进入等待队列,通过自旋直到获取资源为止。

 public final void acquireShared(int arg) {
// 共享模式下获取资源,如果获取失败,则进入等待队列
// 同样该函数需要子类去实现
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg); // 进入等待队列直到锁获取到为止
}

tryAcquireShared(int)函数返回值,需要注意下:

AQS框架源码分析-AbstractQueuedSynchronizer

负数:表示获取失败;

0:获取成功,但没有剩余资源;

正数:获取成功,且有剩余资源;

#1.doAcquireShared(int)

将线程加入队列尾,然后通过自旋获取资源,直到得到资源才返回。

  private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 将线程加入队尾,通过共享模式
boolean failed = true;// 是否成功
try {
boolean interrupted = false; // 在自旋过程中是否被中断过
for (;;) {
final Node p = node.predecessor(); // 前驱节点
if (p == head) { // 这里表明当前节点处于head的next位,此时node被唤醒,很可能是head用完来唤醒
int r = tryAcquireShared(arg); // 获取资源
if (r >= 0) { // 成功
setHeadAndPropagate(node, r);// 将head指向自己,还有剩余资源可用的话再唤醒之后的线程
p.next = null; // help GC 无用链,帮助GC
if (interrupted) // 如果等待过程中被中断过,将中断补上
selfInterrupt();
failed = false;
return;
}
}
// 线程未排在head之后,继续排队,进入waiting状态,等着unpark
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; // 中断标记
}
} finally {
if (failed)
cancelAcquire(node);
}
}

整个流程与独占模式的acquireQueued很相似,只是共享模式下,在唤醒自己后,如果还有剩余资源,需要唤醒后续节点。

setHeadAndPropagate(node, int)

将head节点设置为当前节点,如果还有剩余资源,则唤醒下一个线程。

  private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node); // 将队列中的head执行当前节点
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// 如果还有剩余资源,则唤醒后续线程
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

这里除了将head设置成当前线程,如果有剩余资源,需要唤醒后续节点。

doReleaseShared()

  private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
// 自旋操作
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 如果head状态为SIGNAL,则需唤醒后续节点
// CAS一下当前节点的状态,判断是否为SIGNAL,如果是则置为0,否则继续循环
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); // 唤醒后继节点
}
// 如果head节点状态为0,且CAS置为传播状态失败,则继续循环,因为if操作中会改变节点的状态
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // 如果head节点发生了改变,则继续自旋操作,防止上述操作过程中添加了节点的情况 // loop if head changed
break;
}
}

该方法的作用主要是用于唤醒后续节点。

共享模式获取锁操作与独占模式基本相同:先直接获取资源,如果成功,直接返回;如果失败,则将线程加入等待队列尾,直到获取到资源才返回,整个过程忽略中断。不同点在于共享模式下自己拿到资源后,还需要唤醒后续节点。

#2.releaseShared(int)

同享模式下释放资源

 public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 尝试释放资源
doReleaseShared(); // 唤醒后续节点,前面已经分析
return true;
}
return false;
}

共享模式释放资源与独占模式类似,但是独占模式下需要完全释放资源后,才会返回true,而共享模式没有这种要求。

总结

这里只是对AQS的顶层框架进行了简要的分析,具体需要深入其子类中去,AQS的子类按模式分类可聚合成以下几类:

#1.独占模式:

ReentrantLock:可重入锁。state=0独占锁,或者同一线程可多次获取锁(获取+1,释放-1)。
Worker(java.util.concurrent.ThreadPoolExecutor类中的内部类)线程池类。shutdown关闭空闲工作线程,中断worker工作线程是独占的,互斥的。

#2.共享模式:
Semaphore:信号量。 控制同时有多少个线程可以进入代码段。(互斥锁的拓展)
CountDownLatch:倒计时器。  初始化一个值,多线程减少这个值,直到为0,倒计时完毕,执行后续代码。

#3.独占+共享模式:
ReentrantReadWriteLock:可重入读写锁。独占写+共享读,即并发读,互斥写。

后续对这些类进行详细分析。


by Shawn Chen,2019.1.29日,下午。

上一篇:Objective-C: NSFileManager 的使用


下一篇:《Head First Java(第二版)》中文版 分享下载