本文以公平锁的角度切入AQS
ReentrantLock
Synchronized | ReentrantLock | |
---|---|---|
锁实现机制 | 对象头监视器模式 | 依赖 AQS |
灵活性 | 不灵活 | 支持响应中断、超时、尝试获取锁 |
释放锁形式 | 自动释放锁 | 显示调用 unlock() |
支持锁类型 | 非公平锁 | 公平锁 & 非公平锁 |
条件队列 | 单条件队列 | 多个条件队列 |
是否支持可重入 | 支持 | 支持 |
写在前面:
分清楚:锁是对外界开放的API,而AQS同步器是对线程进行管理的容器,任务区分开来,研究的是AQS。
(Lock:lock、unlock;AQS:acquire、acquireQueue……)。
抽象同步队列最终的实现是:公平锁或者非公平锁
加锁过程:
加锁:
- 通过 ReentrantLock 的加锁方法 Lock 进行加锁操作。
- 会调用到内部类 Sync 的 Lock 方法,由于 Sync#lock 是抽象方法,根据 ReentrantLock 初始化选择的公平锁和非公平锁,执行相关内部类的 Lock 方法,本质上都会执行 AQS 的 Acquire 方法。
- AQS 的 Acquire 方法会执行 tryAcquire 方法,但是由于 tryAcquire 需要自定义同步器实现,因此执行了 ReentrantLock 中的 tryAcquire 方法,由于 ReentrantLock 是通过公平锁和非公平锁内部类实现的 tryAcquire 方法,因此会根据锁类型不同,执行不同的 tryAcquire。
- tryAcquire 是获取锁逻辑,获取失败后,会执行框架 AQS 的后续逻辑,跟 ReentrantLock 自定义同步器无关。
解锁:
- 通过 ReentrantLock 的解锁方法 Unlock 进行解锁。
- Unlock 会调用内部类 Sync 的 Release 方法,该方法继承于 AQS。
- Release 中会调用 tryRelease 方法,tryRelease 需要自定义同步器实现,tryRelease 只在 ReentrantLock 中的 Sync 实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。
- 释放成功后,所有处理由 AQS 框架完成,与自定义同步器无关。
state
state为锁的状态:0~n
state:volatile 、CAS
确保该变量对其他线程也是可见的。
无锁机制改变state的值
1)当state=0时,表示无锁状态
2)当state>0时,表示已经有线程获得了锁,也就是 state=1,但是因为 ReentrantLock 允许重入,所以同一个线程多次获得同步锁的时候,state 会递增,比如重入5次,那么state=5。 而在释放锁的时候,同样需要释放 5 次直到 state=0 其他线程才有资格获得锁
AOS:指定当前的独占锁的线程。
AQS中的队列
特点:
1、先进先出的双端队列
2、通过 Head、Tail 头尾两个节点来组成队列结构,通过 volatile 修饰保证可见性
3、Head 指向节点为已获得锁的节点,head指向的时得到锁的Node节点,是一个虚拟节点,节点本身不持有具体线程;更可以看作是 【有锁节点 + 阻塞队列】
4、获取不到同步状态,会将节点进行自旋获取锁,自旋一定次数失败后会将线程阻塞,相对于 CLH 队列性能较好【抢锁方式:自适应自旋锁】
节点的状态
关键方法与属性 | 对应含义 |
---|---|
waitStatus | 当前节点在队列中处于什么状态 |
thread | 表示节点对应的线程 |
prev | 前驱指针,指向本节点的上一个节点 |
next | 后继指针,指向本节点的下一个节点 |
predecessor | 返回前驱节点,没有的话抛出 NPE 异常 |
属性值 | 值含义 |
---|---|
0 | Node 被初始化后的默认值 |
CANCELLED | 值为1,由于中断或超时,节点被取消 |
SIGNAL | 值为-1,表示节点的后继节点即将被阻塞;当前节点需要唤醒他的后继节点 |
CONDITION | 值为-2,表示节点在等待队列中,节点线程等待唤醒 |
获取锁_核心方法:
- acquire:自旋获取锁的方式
- tryAcquire:会尝试再次通过 CAS 获取一次锁。
- addWaiter:① 将当前线程封装为Node节点,② 加入上面锁的双向链表(等待队列)中(enq)
- 如果为尾节点不为空,需要将新节点添加到 oldTail 的 next 节点,同时将新节点的 prev 节点指向 oldTail;
- 如果当前队列为空,需要进行初始化,此时 head 结点和 tail 节点都是 h = new Node () 实例;此时 oldTail = h 不为空,node 的 prev 为 oldTail, oldTail 的 next 是 node。(必成功,因为自旋)
- acquireQueued:通过自旋,判断当前队列前置节点是否可以获取锁
- 当前节点就是队列第一个(head.next),有抢锁的资格tryAcquire一次,返回interrupted = false,退出自旋;
- 如果 node 是第一个加入等待队列的,此时 node 的 prev 节点是 head ( new Node() ),node 会先去获取锁,失败后,因为 prev 的 waitStatus = 0,这时候将其 waitStatus 设置为 -1,然后再次循环,再获取锁失败就会调用 parkAndCheckInterrupt 阻塞当前线程;
- shouldParkAfterFailedAcquire 过程中会将队列中处于 CANCELLED = 1 的节点删除。也就是说每添加一个节点,获取锁失败后,都可能会对队列做一遍整理;判断当前节点是否需要park
tryAcquire
尝试拿锁,只有两种情况能够成功:
1)无锁
2)自己人(涉及到锁的重入)
protected final boolean tryAcquire(int acquires) {
// 判断锁状态 和 线程能否重入
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//因为fairSync是公平锁,任何时候都需要检查一下 队列中是否在当前线程之前有等待者..
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter
addWaiter:① 将当前线程封装为Node节点,② 加入上面锁的双向链表(等待队列)中(enq)
- 如果为尾节点不为空,需要将新节点添加到 oldTail 的 next 节点,同时将新节点的 prev 节点指向 oldTail;
- 如果当前队列为空,需要进行初始化,此时 head 结点和 tail 节点都是 h = new Node () 实例;此时 oldTail = h 不为空,node 的 prev 为 oldTail, oldTail 的 next 是 node。(必成功,因为自旋)
acquireQueued
获取锁(tryAcquire)->构造节点(addWaiter)->加入队列(addWaiter)->自旋获取锁(acquireQueued)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
//当前线程是否被中断 = 标记等待过程中是否中断过
boolean interrupted = false;
try {
//自旋..前置节点..Node 要么获取锁,要么中断
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false; //当前线程 获取锁 过程中..没有异常
return interrupted; // 这里才是出口,唯一的出口
}
// 挂起当前线程,并且唤醒之后 返回 当前线程的 中断标记
// 唤醒:1.正常唤醒 其它线程 unpark 2.其它线程给当前挂起的线程 一个中断信号..
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
//interrupted == true 表示当前node对应的线程是被 【中断信号唤醒的...】
interrupted = true;
}
} finally {
//true 表示当前线程抢占锁成功,普通情况下【lock】 当前线程早晚会拿到锁..
//false 表示失败,需要执行出队的逻辑... (响应中断的lock方法时。)
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire
1、pred(waitState)为0,当前的结点不需要阻塞。;
2、pred(waitState)为1,跳过;
3、pred(waitState)为-1,park当前节点。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 1、前方节点是处于阻塞,挂起当前线程(申锁节点进行阻塞)
if (ws == Node.SIGNAL) return true;
// 2、队列包含了被解约的节点
if (ws > 0) {
do {
// 跳过所有的取消的Node == 将所有的CANCELLED节点出队
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
// 3、初始化的Node,默认状态为0。
} else {
// 将前置节点强制设置为 SIGNAl,表示前置节点释放锁之后需要 喊醒我..
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
取消任务
public void unlock() {
sync.release(1);
}
这里主要就是对资源的释放,更新同步状态,然后唤醒下一个等待线程。
release
//AQS#release方法
//ReentrantLock.unlock() -> sync.release()【AQS提供的release】
public final boolean release(int arg) {
//尝试释放锁,tryRelease 返回true 表示当前线程已经完全释放锁
//返回false,说明当前线程尚未完全释放锁..
if (tryRelease(arg)) {
//head什么情况下会被创建出来?
//当持锁线程未释放线程时,且持锁期间 有其它线程想要获取锁时,其它线程发现获取不了锁,而且队列是空队列,此时后续线程会为当前持锁中的
//线程 构建出来一个head节点,然后后续线程 会追加到 head 节点后面。
Node h = head;
//条件一:成立,说明队列中的head节点已经初始化过了,ReentrantLock 在使用期间 发生过 多线程竞争了...
//条件二:条件成立,说明当前head后面一定插入过node节点。
if (h != null && h.waitStatus != 0)
//唤醒后继节点..
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
//Sync#tryRelease()
protected final boolean tryRelease(int releases) {
//减去释放的值..
int c = getState() - releases;
//条件成立:说明当前线程并未持锁..直接异常.,.
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
//当前线程持有锁..
//是否已经完全释放锁..默认false
boolean free = false;
//条件成立:说明当前线程已经达到完全释放锁的条件。 c == 0
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//更新AQS.state值
setState(c);
return free;
}
unparkSuccessor
1)如果其下一个节点为空,或者其等待状态是取消状态,那么就从后往前找,找到一个等待状态 <=0 的,然后将其唤醒;
2)如果下一个节点不为空,且等待状态 <=0,将其唤醒。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
//条件一:
//s 什么时候等于null?
//1.当前节点就是tail节点时 s == null。
//2.当新节点入队未完成时(1.设置新节点的prev 指向pred 2.cas设置新节点为tail 3.(未完成)pred.next -> 新节点 )
//需要找到可以被唤醒的节点..
//条件二:s.waitStatus > 0 前提:s != null
//成立:说明 当前node节点的后继节点是 取消状态... 需要找一个合适的可以被唤醒的节点..
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//如果找到合适的可以被唤醒的node,则唤醒.. 找不到 啥也不做。
if (s != null)
LockSupport.unpark(s.thread);
}
cancelAcquire
/**
* 当前取消排队的node所在 队列的位置不同,执行的出队策略是不一样的,一共分为三种情况:
* 1.当前node是队尾 tail -> node
* 2.当前node 不是 head.next 节点,也不是 tail
* 3.当前node 是 head.next节点。
*/
private void cancelAcquire(Node node) {
if (node == null) return;
node.thread = null;
Node pred = node.prev;
//滤除所有的无效节点pred.waitStatus > 0 即:1 cancelled
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
//条件一:node == tail 成立:当前node是队尾 tail -> node
//条件二:compareAndSetTail(node, pred) 成功的话,说明修改tail完成。
if (node == tail && compareAndSetTail(node, pred)) {
//修改pred.next -> null. 完成node出队。
compareAndSetNext(pred, predNext, null);
} else {
int ws;
//条件一:pred != head 成立, 说明当前node 不是 head.next 节点,也不是 tail【中间的节点】
if (pred != head &&
//条件2.1:成立:说明node的前驱状态是 Signal 状态 不成立:前驱状态可能是0,
((ws = pred.waitStatus) == Node.SIGNAL ||
// 假设前驱状态是 <= 0 则设置前驱状态为 Signal状态..表示要唤醒后继节点。
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {
Node next = node.next;
//让pred.next -> node.next ,所以需要保证pred节点状态为 Signal状态。
if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next);
} else {
//head.next节点,直接unpark后继(相当于忽略现在的节点)
unparkSuccessor(node);
}
node.next = node; // help GC,修改了 node 的next 指针,但是保证了 prev 指针的不变
}
}
为什么所有的变化都是对 Next 指针进行了操作,而没有对 Prev 指针进行操作呢?
执行 cancelAcquire 的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过 Try 代码块中的 shouldParkAfterFailedAcquire 方法了),如果此时修改 Prev指针,有可能会导致 Prev 指向另一个已经移除队列的 Node,因此这块变化 Prev 指针不安全。
什么情况下会对 Prev 指针进行操作?
shouldParkAfterFailedAcquire 方法中,会执行下面的代码,其实就是在处理 Prev 指针。shouldParkAfterFailedAcquire 是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更 Prev 指针比较安全。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
参考资料:
《Java并发编程实战》
《小刘源码课程》
https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html