从ReentrantLock看AQS (AbstractQueuedSynchronizer) 运行流程
概述
本文将以ReentrantLock为例来讲解AbstractQueuedSynchronizer的运行流程,主要通过源码的方式来讲解,仅包含大体的运行流程,不会过于深入。
ReentrantLock 介绍
ReentrantLock 是JDK提供的可重入锁实现类,可用其替换synchronized来实现锁重入效果;其底层实现主要是依靠AbstractQueuedSynchronizer,本文将通过ReentrantLock来观察AbstractQueuedSynchronizer的运行流程。
AbstractQueuedSynchronizer
介绍
关于AbstractQueuedSynchronizer(以下简称AQS),JDK是这样子描述的:
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.
大体意思就是“提供一个框架,用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量、事件等)”,AQS不是一个功能完整的类,而是一个提供了一套依赖于FIFO等待队列的流程框架,该框架可用于实现锁等同步器的。AQS中没有使用任何锁相关的API,其实现主要依靠CAS (Compare And Swap),是一个优秀的lock-free 编程实践。
AQS数据结构
AQS中主要包含以下三个字段
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;
这三个字段都标记了volatile
,其中head
和 tail
主要用于维护AQS的FIFO双向队列,该队列是AQS的核心,只由AQS该维护该队列,子类实现不会去维护该队列;state
用于标记当前同步器的转态,AQS不会对该字段做任何操作,该字段由子类去维护,但AQS提供了修改state
的方法,其中的compareAndSetState
是子类用的最多的,主要用于实现多线程对同步器的抢夺。
AQS 主要方法
在开始了解AQS的运行流程之前,我们先看一下在使用AQS时需要关注的四个方法:
// 抢夺资源流程的入口,AQS暴露出的API,由自定义同步器来调用,ReentrantLock 的lock方法就是去调用该方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 由子类实现该方法,抢占资源逻辑在这个方法实现,该方法由AQS在抢夺资源流程中调用。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 释放资源流程的入口,AQS暴露出的API,由自定义同步器来调用,ReentrantLock 的unlock方法就是去调用该方法。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 由子类实现该方法,释放资源逻辑在这个方法实现,该方法由AQS在释放资源流程中调用。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
其中acquire
和 tryAcquire
跟同步器资源抢夺相关;release
和 tryRelease
和同步器资源释放相关。acquire
和release
方法是AQS流程的入口,通过这两个方法来走资源抢夺和资源释放的流程,该流程中包含了FIFO队列维护、线程状态管理等操作,是整个AQS的核心,自定义同步器中会去调用这两个方法;而tryAcquire
和tryRelease
方法对应线程资源抢占和释放操作,这两个方法中只关心线程是否抢占/释放资源成功,不会维护FIFO队列和线程状态,由子类来实现这两个方法,这个是自定义同步器的核心代码,由这两个方法来实现不同同步器的不同功能。
这四个方法对应的是线程独占流程,共享流程使用的是
acquireShared
,tryAcquireShared
,releaseShared
,tryReleaseShared
这四个方法,ReentrantLock是线程独占模式,所以本文主要讲解线程独占流程,但线程共享流程和独占流程差别不大,感兴趣的同学可以自行了解。
下面看一下ReentrantLock中是如何去使用这四个方法的:
// 简化版代码,方便演示
public class ReentrantLock implements Lock, Serializable {
private final Sync sync = new Sync();
public void lock() {
this.sync.acquire(1);
}
public void unlock() {
this.sync.release(1);
}
static final class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 当前锁没有被占用
if (compareAndSetState(0, acquires)) { // 尝试去拿锁
setExclusiveOwnerThread(current); // 标记当前线程为锁的持有者
return true; // 返回拿锁成功
}
}
else if (current == getExclusiveOwnerThread()) { // 当前锁被占用,且是被自己占用,走重入逻辑,对state做累加
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 锁被占用或者拿锁失败,返回拿锁失败
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 锁被完全释放
free = true; // 标记锁已被释放
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
可以看出,ReentrantLock的核心功能是由内部类Sync来完成的,而这个Sync类就是继承了AQS,并重写了tryAcquire
和 tryRelease
方法,这两个方法的实现也很简单
-
tryAcquire
中主要是通过state字段是否等于0来判断当前锁是否锁住了,没有锁住,则当前线程使用CAS去尝试将state标记为acquires,标记成功则代表拿锁成功,返回true
,否则返回false
。 -
tryRelease
中则是通过判断释放后state是否为0来判断当前锁是否被完全释放,若完全释放则返回true
,否则返回false
。
可以看出这个两个方法的实现只关注锁的占用和释放是否成功,没有关心FIFO队列和线程状态。那么FIFO队列和线程状态是如何来维护的呢?答案就是在acquire
和release
方法中;从ReentrantLock源码也可以看到,lock
和unlock
方法只是调用了一下acquire
和release
方法,所以接下来我们重点来看一下这两个方法的实现
Node
开始前,先看一下队列Node的数据结构
static final class Node {
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
其中waitStatus
字段代表了节点的等待状态,共包含了5个值。
-
INIT(0)
: 节点的默认状态为0。当线程释放资源后,也会将自己的节点状态设置为0。 -
CANCELLED(1)
: 当前线程节点已取消等待,为CANCELLED
的节点会在acquire
方法里的流程中被清除。 -
SIGNAL(-1)
: 当前节点的后续节点已沉睡,需要被唤醒。会在release
方法里的流程中将其后续节点唤醒。 -
CONDITION(-2)
和PROPAGATE(-3)
则和条件锁及共享模式有关,本文不过多解释。
其中可以看出,状态可以分为两类,无效转态和有效状态,大于0则代表当前节点无效了,需要被移除,小于等于0则是有效节点,需要继续去尝试获取资源。
acquire
先看一下acquire
方法的实现
public final void acquire(int arg) {
// 调用我们自己实现的tryAcquire去获取资源,获取失败则尝试将自己加入到等待队列中
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
再看一下addWaiter
方法
private Node addWaiter(Node mode) {
Node node = new Node(mode);
// 为当前线程新建一个node,并将其加入到队列尾部,这里需要注意的是添加时是先将新节点的prev设置为老的尾部节点,使用CAS将新节点设置为tail后才会将老节点的next设置为新节点,
// 这样做的理由是为了防止并发问题,后续的对队列的修改也是包括两种遍历,一个是从前往后,一个是从后往前。
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
添加完节点后,就会尝试去从队列中获取资源,这里也是AQS的核心了,看一下acquireQueued
方法
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
// 取到当前节点的上一个节点
final Node p = node.predecessor();
// 如果上一个节点是头节点,则证明前面没有在等待的线程了,轮到当前线程去尝试获取资源。
if (p == head && tryAcquire(arg)) {
// 获取资源成功,则将当前节点设为头节点,并设置为空节点
setHead(node);
p.next = null; // help GC
return interrupted;
}
// 判断前面一个节点是否是一个有效的等待线程节点(没有取消等待的线程),是则将自己睡眠,不是则将前面已取消的线程节点移除,直到找到一个有效的线程节点作为自己的前序节点并将其状态设为SIGNAL,然后再走一次循环。
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
总结一下acquire
方法的流程:
- 调用子类实现的
tryAcquire
方法去获取资源。 - 获取成功则当前线程继续执行,否则将自己加入到等待队列。
- 进入等待队列后,循环的去判断自己的上一个节点是否是头结点,是则再次调用子类实现的
tryAcquire
方法去获取资源,否则进入沉睡,等待唤醒。 - 在沉睡前,会将前面已取消的线程节点移除,直到找到一个有效的线程节点做为自己的前序节点,并将其状态设为SIGNAL。
一句话就是acquire
方法会去尝试获取资源,获取失败则将自己加入到等待队列,并沉睡,等待唤醒。
举个例子,demo1线程获取资源失败后,当前的队列状态为:
若是有多个线程等待,则队列状态为:
由图可以看出,若当前节点已沉睡,则需要在沉睡前将前一个节点的转态设为SIGNAL
。
release
前面的acquire
方法说到,等待队列中的线程会进入沉睡,等待唤醒,那么由谁来唤醒呢?答案就是release
方法,也就是我们在调用unlock的时候,接下来我们先看一下release
方法的实现
public final boolean release(int arg) {
// 调用子类实现的tryRelease方法去释放资源
if (tryRelease(arg)) {
Node h = head;
// 释放成功,且等待队列中还有节点,则去唤醒下一个等待线程
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
再看一下unparkSuccessor
方法
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
Node s = node.next;
// 如果下一个节点已经取消了,则从后往前遍历,找到第一个等待线程节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
// 找到了等待线程节点,并将其唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
唤醒后线程将会继续在acquireQueued
方法里执行,并尝试再去获取资源。
总结一下release
方法的流程:
- 调用子类实现的tryRelease方法去释放资源
- 释放成功则判断当前队列是否有等待线程
- 有则找到第一个等待线程,并将其唤醒。
从流程上也可以看出,若tryRelease
方法中抛了异常,则会导致所有沉睡的线程将无法被唤醒。
AQS总结
AQS是模版设计模式的一种实践,其将线程调度等通用逻辑进行了实现,只预留了tryAcquire
和 tryRelease
方法供子类实现自己的并发工具,大大的减轻了实现的复杂难度。
总结
ReentrantLock 是使用AQS实现的,其主要是实现了AQS的tryAcquire
和 tryRelease
方法,且只需要在lock
和unlock
方法中调用一下acquire
和release
方法即可,可以看出AQS非常强大!