AQS独占模式(基于ReentrantLock公平锁) — 源码解析
一、 基本概念
AQS
全称 AbstractQueuedSynchronizer
, 是JUC包下的一个抽象类。可以说它是整个JUC并发的基础框架。非常重要。在ReentrantLock
, ReentrantReadWriteLock
,Semaphore
,CountDownLatch
等都是继承了AQS实现的。
二、线索追踪
以ReentrantLock 为例,我们来探究其如何基于AQS实现独占锁的效果的。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
上面是ReentrantLock的两个构造方法。
不难看出, 默认情况下我们使用的是无参构造 那么底层为我们创建了一个new NonfairSync()
对象 也就是所说的非公平锁。 当我们想要使用公平锁 就需要使用带参构造器, new ReentrantLock(true)
这种方式来构建一个公平锁,而底层则使用new FairSync()
。 之后在用户层面调用的lock
unlock
等方法其实内部都是调用了sync
这个对象引用中的方法。
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer{...}
static final class NonfairSync extends Sync{...}
static final class FairSync extends Sync{...}
以上是ReentrantLock
中的成员属性及内部类
private final Sync sync;
:则是构造方法底层创建的对象的引用。可能是NonfairSync
也可能是 FairSync
abstract static class Sync extends AbstractQueuedSynchronizer
:是一个静态抽象内部类,它则继承了我们的主角AQS
。
static final class NonfairSync extends Sync
static final class FairSync extends Sync
:
而后面这两个静态内部类则是继承且实现了Sync
中的方法。 它们与AQS
之间是典型的模板方法模式。
三、源码分析 (以公平锁为例)
由于当用户层面执行了ReentrantLock 中的方法 ,内部会引发多层类与类之间方法的调用。
那么我们会根据他们的调用链来解析源码。
在这之前先介绍下AQS中的属性
AQS成员(必要准备)
static final class Node {
//共享模式
static final Node SHARED = new Node();
//独占模式
static final Node EXCLUSIVE = null;
//取消状态 ReentrantLock并不涉及
static final int CANCELLED = 1;
//信号状态 处于该状态的Node会当释放锁时 会唤醒其后继节点来抢锁
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
//状态
volatile int waitStatus;
//前驱节点
volatile Node prev;
//后继节点
volatile Node next;
//当前节点的线程
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
//用来维护双向链表的 头指针 和 尾指针
private transient volatile Node head;
private transient volatile Node tail;
//AQS 是否被被占用
// state = 0 未被占用
// state = 1 被占用
// state > 1 被占用且体现可重入性
private volatile int state;
1.lock
//ReentrantLock.lock()
public void lock() {
sync.lock();
}
//FairSync.lock()
final void lock() {
acquire(1);
}
//AQS.acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
那么我们直接来看AQS
的 acquire
方法
先说一下该方法的主要逻辑:
进来的线程尝试去获取锁
1.获取成功则直接退出了。
2.获取不到则说明已经有其它线程提前获取了这把锁。那么此时该线程就需要进入队列,入队后还会去尝试获取一下锁,若还是获取不到则 park() ,直到被其它线程unpark() 唤醒后继续去获取锁。
接下来分析下逻辑判断:
!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
: 这里涉及到了三个方法。
1.1.tryAcquire()
该方法是AQS
子类FairSync
重写的,也就是模板方法。
返回值为 true时 该线程抢锁成功
返回值为 false时 该线程抢锁失败
protected final boolean tryAcquire(int acquires) {
//拿到当前要获取锁的线程的引用
final Thread current = Thread.currentThread();
//拿到当前AQS的state值
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
这里再说一下state值的作用:
- 当state = 0 时说明AQS独占锁处于释放状态 没有线程持有
- 当state = 1 时说明AQS独占锁被线程占有
- 当state > 1 时说明AQS独占锁被线程占有 且是重入状态。
上述代码有3个分支 ,根据各个条件判断可知
条件成立:说明当前独占锁处于释放状态。那么此时该锁可以被获取。 (进入第一个分支)
条件不成立:说明当前独占锁处于被持有状态,若此时想要获取这把锁会有两种情况:
? 1.获取成功 持有这把锁得是当前线程 AQS的可重入性 (进入第二个分支)
? 2.获取失败 持有这把锁的是其它线程 (进入第三个分支)
分支1:
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
此时锁是处于释放状态,也就说明当此时有很多线程去获取这把锁,那么会存在并发竞争问题。
那么思考一下:这些线程一定都会有权限来争抢这把锁吗?
答案是不一定,因为这是把公平锁。下面分两种情况来考虑:
- 当前队列是空 说明这些线程都是第一次来抢锁,此时这些线程都有权利来争抢锁, 谁先抢到是谁的。
- 当前队列不为空 说明这把锁在释放之前已经有线程来尝试获取了 但是没有获取到,乖乖的在后面排队了。那么此时只有排在Head节点的后继节点的线程才有权限拿到这把锁,因为要遵循先来后到且此时不会存在并发问题了。
在这里先说一下当队列形成时的情况:头节点是当前持有锁的线程 ,而只有头节点的后继节点才有权限获取这把锁(前提是头节点释放了这把锁)。
我们来看方法体中的这个判断:!hasQueuedPredecessors() &&compareAndSetState(0, acquires)
hasQueuedPredecessors
//AQS. hasQueuedPredecessors
public final boolean hasQueuedPredecessors() {.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
该方法中 最后的逻辑运算写的非常精彩:
我们按结果来分析
1.当结果是true 也就是当前线程没有权限获取锁:
-
h != t && s.thread != Thread.currentThread()
:这个很好理解,当前有排队的线程,而当前线程不是头节点的后继节点,因此没有权限来获取锁。 -
h != t && (s = h.next) == null
: 这个可能很多人会有疑问,h != t 了 为什么 h.next 会为null呢?此时就是node入队的一种极限情况。 先说一下node入队的步骤吧:1. pred = tail (pred也指向尾节点) 2. node.prev = pred (将node的前驱指针指向尾节点) 3.casTail(node) (将tail指针指向node) 4.pred.next = node (将原尾节点的后继指针指向node)。 而此时在入队前是处于队列中只有一个节点(head== tail) 的时候,当在入队的过程中步骤3执行完了 但是还没有执行步骤4时 就会出现 head != tail 且 head.next == null的情况了。 而这时说明已经有线程入队了,但是没有完全入队。这时也只能该即将完成入队的线程有权限获取锁。
2.当结果是false 也就是当前线程有权限争抢锁:
-
h != t
条件不成立 也就是此时h==t
, 这个也有两种情况-
h == t == null
是空队列并没有形成排队的情况,那么所有线程都有权限来争抢锁 -
h == t != null
该种情况是处于 当持有锁的线程还未释放时,有线程来抢锁但是由于失败而入队,在入队前他需要帮助当前持有锁的线程将它入队作为头节点,然后自己再去排队,但是在自己帮助当前持有锁的线程入队作为头节点后 ,当前持有锁的线程释放了锁,且自己还未入队时的这种情况。 那么所有线程也都有权限来争抢锁。
-
-
h != t &&((s = h.next) == null || s.thread == Thread.currentThread())
: 这种情况非常好理解当前有排队的线程,且当前线程是头节点的后继节点。
至此 hasQueuedPredecessors()
方法讲解完 ,该方法写的很短小精妙,但也是AQS中最难理解的。
小结: 该方法判断当前线程的前面是否有排队的线程。
返回true 说明当前线程前面有排队的线程 则当前线程没有权限获取锁
返回false 说明当前线程前面没有排队的 则当前线程可以去争抢锁
当hasQueuedPredecessors()
返回false时 取反为true 则会走到后面一个的条件
compareAndSetState(0, acquires)
: 该方法是CAS操作,用来设置state值, true 设置成功 false 设置失败。
由于当 head == tail 时会存在并发,因此要使用CAS。
setExclusiveOwnerThread(current);
: 当某个线程成功设置了state值后,最后要将exclusiveOwnerThread设置为当前抢锁成功的线程。 由于只有一个线程会成功设置state,因此执行该方法不用考虑并发。成功抢锁 返回true.
分支2:
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
说明此时是 AQS的可重入,只有当前持有锁的线程才能进入该分支,那么不会存在并发问题,只需要在原state值的基础上增加就可以了。
分支3:
也就是剩下那些没有抢到锁的线程,都会返回false,说明尝试获取锁失败。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
再来看下acquire
方法,
当tryAcquire
为true时,取反为false 不需要执行后面的逻辑。说明获取锁成功,那么整个方法就执行结束了。
当tryAcquire
为false时,取反为true 则需要执行后面的方法。说明尝试获取锁失败,则下面需要入队休眠等操作。
那么接下来看入队的操作 addWaiter(Node.EXCLUSIVE)
1.2 addWaiter()
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
Node node = new Node(Thread.currentThread(), mode);
:给当前抢锁失败的线程创建节点,且模式为EXCLUSIVE
独占模式。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
该代码是之前在hasQueuedPredecessors
方法中提到的入队操作.
在这里用到了compareAndSetTail(pred, node)
:CAS设置尾指针,因为此时可能会有很多失败的线程抢着入队,因此会有并发操作。
enq(node);
:执行到该方法有下面2种情况:
-
head == tail == null
目前还未形成队列 。 - 目前形成了队列,且抢着先入队的线程CAS失败。
1.3 enq()
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
首先是一个自旋,不难想象因为入队存在并发问题,同一时间只能有一个线程入队成功,那么这里使用了自旋。
该自旋中有两个分支,也就是为了处理前面所说来到这个方法的两种情况。
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
}
该分支是目前还未形成队列,head == tail == null
。
当队列为空,且当前锁被某个线程持有时,那么后面进来排队的线程需要先帮助该持有锁的线程作为头节点入队,且该头节点的thread属性为null。将头节点设置好后,继续进入自旋 然后自己入队。
可以低俗的理解为: 这些抢锁失败的线程想要抢着给当前持有锁的线程擦屁股,献殷勤,可能是觉得自己表现好了能先排队,但是事实并非想的这么好。就算帮老大擦了屁股,也不一定会得到奖赏(先入队)。
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
这里还是先前入队的操作不用多说了。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
那么我们再来看下acquire
方法 。
addWaiter(Node.EXCLUSIVE)
:尝试抢锁失败的线程入队后 一定会返回自己的node节点。
那么可以写成acquireQueued(node, arg)
:该方法中会再次尝试抢锁,若仍失败则park()等待。
1.4 acquireQueued()
返回true 表示线程挂起过程中被中断唤醒过
返回false 表示线程挂起过程中未被中断过
final boolean acquireQueued(final Node node, int arg) {
//true 表示当前节点被取消了 false表示当前节点未被取消
boolean failed = true;
try {
//表示当前节点线程是否被中断
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
当线程首次进入该方法,或者被unpark唤醒后,都会先走下面的分支,目的是尝试获取锁。
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
final Node p = node.predecessor();
:获取当前线程节点的前驱节点。
if (p == head && tryAcquire(arg))
: 如果当前线程的前驱节点是头节点 则有权力去尝试获取锁也就是执行tryAcquire(arg)
方法 ,该方法前面已经说过了。
条件成立: 说明抢锁成功,则需要将自己设置为头节点,然后再将原来的头节点移除队列 。
条件不成立:说明抢锁失败 则会进入后面的逻辑。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
该判断中有两个方法
1.5 shouldParkAfterFailedAcquire()
该方法目的是判断 该获取锁失败的节点 是否要park
返回值:
? true: 要park
? false: 不park
参数 :
? pred:为当前获取锁失败的线程节点的前驱节点
? node:为当前获取锁失败的线程节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前驱节点的状态引用赋值
int ws = pred.waitStatus;
//前驱节点是SIGNAL 则要被park
if (ws == Node.SIGNAL)
return true;
//前驱节点是CANCELLED 取消状态 在ReentrantLock里暂时先不考虑这种状态
if (ws > 0) {
//则为当前获取锁失败的线程节点找到一个 默认状态 或者 SIGNAL状态的前驱节点。
//换句话说 就是将获取锁失败的线程节点 前的所有CANCELLED状态的节点出队。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//若前驱节点是0默认状态 则设置为-1 SIGNAL状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
前提说明: Node节点在ReentrantLock
中有 三种状态
- 默认为0 也就是初始状态
- 值为 -1 SIGNAL 说明如果当该节点释放锁后 则会去unpark 唤醒并通知其后继节点来获取锁
- 值为 1 CANCELLED 说明该节点处于取消状态,不再会参与抢锁释放锁的过程了,要被从队列中移除。
小结:
返回true 说明已经为当前获取锁失败的节点找到了SIGNAL状态的节点,放心的去park等待了,以便该SIGNAL节点释放锁后可以唤醒通知自己。
返回false 说明还没有为当前获取锁失败的节点找到SIGNAL状态的节点,但是后面还会经过自旋再次进入该方法,最终都会找到自己前面的SIGNAL节点。
一句话:为自己在睡觉前找到一个可以将自己叫醒的人。
1.6 parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
该方法很简单,就是调用 LockSupport.park()
,是自己进入等待状态。
2.unlock
//ReentrantLock unlock
public void unlock() {
sync.release(1);
}
//AQS release
public final boolean release(int arg) {
//条件成立 尝试释放锁成功
if (tryRelease(arg)) {
//赋值头指针引用
Node h = head;
//条件成立:唤醒通知头节点的后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
//释放锁成功
return true;
}
//释放锁失败 说明当前锁 不是该线程持有的。
return false;
}
2.1 tryRelease()
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//判断当前持有锁的线程 是不是调用unlock的线程,若不是则抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
//标记是否完全释放 没完全释放锁是重入情况下
boolean free = false;
//条件成立 完全释放
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//条件不成立: 锁重入情况下
setState(c);
return free;
}
2.2 unparkSuccessor()
private void unparkSuccessor(Node node) {
//此时node是头节点
//头节点的状态引用赋值
int ws = node.waitStatus;
//头节点是SIGNAL
if (ws < 0)
//条件成立 将状态设置为默认0
compareAndSetWaitStatus(node, ws, 0);
//头节点的后继节点
Node s = node.next;
//从尾部开始往头遍历 找到有效的后继节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//如果后继节点不是空,则唤醒它 让它来继续释放锁
if (s != null)
LockSupport.unpark(s.thread);
}
至此 ReentrantLock的逻辑就都讲完了,顺便在说一下非公平锁。
非公平锁比公平锁就是在tryAcquire
方法上少了一个方法。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//在这里没有hasQueuedPredecessors 方法
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
少了hasQueuedPredecessors
方法也就意味着,当锁释放后 state=0 时,任何线程都有机会来抢锁。
但是其它没有获取到锁的线程还是要排队,要依次等待着被唤醒。