[^ ]: 以下源码分析基于JDK1.8
ReentrantLock 示例
private ReentrantLock lock = new ReentrantLock(true);
public void f(){
try {
lock.lock();
//do something
}
finally {
lock.unlock();
}
}
源码解析(公平锁-lock流程)
构造方法
//默认是非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//构造参数传入是否使用公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
[^]: 下文主要以公平锁为示例做分析
核心变量
private final Sync sync;
//大名鼎鼎的 AQS
abstract static class Sync extends AbstractQueuedSynchronizer{...}
//队列(链表)头
private transient volatile Node head;
//队列(链表)尾
private transient volatile Node tail;
//状态 state = 0 未加锁 > 0 已经加锁
private volatile int state;
ReentrantLock#lock()
public void lock() {
sync.lock();
}
FairSync#lock()
final void lock() {
acquire(1);
}
AbstractQueuedSynchronizer#acquire()
[^ ]: acquire v.(通过努力、能力、行为表现) 获得; 购得; 获得; 得到;
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
//第一步:尝试获取锁,如果获取成功,直接返回
//第二步:加入等待队列
//第三步:再次尝试获取锁
//if(!tryAcquire(arg)){
//加入等待队列
//Node node = addWaiter(Node.EXCLUSIVE);
//入队之后,再次尝试获取锁,在做一次努力,因为有可能此时上一个线程已经释放锁了,获取锁之后会返回是否被打断,如果被打断了,执行 selfInterrupt();
//if(acquireQueued(node,arg)){
//打断
//selfInterrupt();
//}
}
}
FairSync#tryAcquire(arg)
AQS 中并没有实现 tryAcquire
方法,交给了子类实现。
[^ ]: recursive 递归的;循环的
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
// acquires = 1
// 获取当前线程
final Thread current = Thread.currentThread();
//先获取加锁状态
int c = getState();
//状态为0,代表没有上锁
if (c == 0) {
//存在并发,重新判断是否直接进行CAS上锁
//hasQueuedPredecessors() 判断当前线程之前是否还有线程在排队等待锁,如果没有,就执行CAS修改state状态,如果修改成功,将当前线程变量赋值
if (!hasQueuedPredecessors() &&
//CAS 0 -> 1
compareAndSetState(0, acquires)) {
//加锁成功,给变量 exclusiveOwnerThread 赋值
setExclusiveOwnerThread(current);
return true;
}
}
//此时已经有线程占有锁,先判断,是否是自己占有锁,如果是自己,那就将 state + 1 实现可重入锁的特性
else if (current == getExclusiveOwnerThread()) {
//是自己占有的,将 state + 1
int nextc = c + acquires;
//int值溢出-一般场景中不会加这么多层
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
//更新状态 state
setState(nextc);
return true;
}
// CAS 竞争失败,或者锁已经被其他线程占用,返回 false ,加锁失败
return false;
}
ReentrantLock#hasQueuedPredecessors()
public final boolean hasQueuedPredecessors() {
//头结点
Node t = tail;
//尾节点
Node h = head;
Node s;
//第一种情况:就一个线程进入,此时 head 和 tail 都为 null,h!=t 不成立,直接返回 false,表示并没有任何线程正在队列中等待
//第二种情况:头部和尾部不一致 s= h.next == null ,按理说如果 头部和尾部不一致,那不会出现 h.next == null 的情况,但是在并发中,是会出现的,所以,说明此时正在有其他线程尝试获取锁,或者正在获取的路上,那么当前线程放弃获取,等其他线程去获取吧
//第三种情况:头结点的下一个节点不为 null ,但是 节点线程不是当前线程,说明前边还有一个线程在等待,当前线程还是老老实实的排队吧,获取锁失败。
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
单纯的看注释肯定也是有点懵逼的,这段代码要结合后续的代码去分析。下面我将结合一个队列模型图来继续分析后续的代码:
场景模拟
场景1:第一个线程 T1 尝试获取锁,此时队列中并没有任何(Node),h!=t
条件不成立,可以去获取锁了。
此时线程 T1获取锁成功,假如它瞬间就执行完了,释放锁,将state
设置为0。线程T2现在准备尝试获取锁了,因为T1已经将锁释放,所以T2会顺利获取锁。所以即使加了锁,在一些线程竞争较少的场景,锁不会影响程序的正常运行,可以忽略。
场景2:当然在高并发业务中,肯定没有这么简单,下面我们考虑线程 T1,T2同时竞争锁的情况,我们回到前面的代码:
FairSync#tryAcquire(arg)
int c = getState();
//状态为0,线程T1 ,T2 都进来了
if (c == 0) {
//此时T1 T2 存在竞争,CAS保证至少有一个能够获取锁,另外一个获取失败,那么假如T1获取成功了,T2获取失败了,此时要调用 addWaiter(Node.EXCLUSIVE) 方法,将T2加入到等待队列中
if (!hasQueuedPredecessors() &&
//CAS 0 -> 1
compareAndSetState(0, acquires)) {
//加锁成功,给变量 exclusiveOwnerThread 赋值
setExclusiveOwnerThread(current);
return true;
}
AbstractQueuedSynchronizer#addWaiter(Node mode)
//ReentrantLock中 mode = Node.EXCLUSIVE 独占锁
private Node addWaiter(Node mode) {
//新生成一个Node,mode 会赋值给nextWaiter(这个先忽略)
Node node = new Node(Thread.currentThread(), mode);
//找到尾部节点
Node pred = tail;
//如果尾部节点不为空,那么将此Node加入到尾部
if (pred != null) {
//将此节点的prev 改为 pred
node.prev = pred;
//CAS设置尾节点,如果成功,返回此节点,否则CAS失败,执行 enq 方法
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//enq 方法,自旋,保证节点肯定能够入队
enq(node);
return node;
}
从场景2中我们知道,此时 pred
是为 null 的,所以,这里直接走enq
方法
AbstractQueuedSynchronizer#enq(Node node)
[^ ]: 核心关注点:头部节点不参与抢锁
private Node enq(final Node node) {
//自旋,必须将这个节点加入到队列中不可
for (;;) {
//第一次 tail 为null
//再次自旋之后,tail不为空
Node t = tail;
if (t == null) {
//设置头部,这里要注意,并不是直接把 T2 的Node 设置为头部,而是加入了一个新的 thread 为空的节点。用老师的话说就是,就好像买火车票排队一样,第一个人不属于排队,他已经在办理业务了,而从第二个人开始才算排队中,所以此时 head 节点为 new Node()
if (compareAndSetHead(new Node()))
//设置成功之后,进入下一次循环(此时队列见 图2)
tail = head;
} else {
//将尾节点赋值给 T2 所在的 Node
node.prev = t;
//CAS 设置尾节点,如果CAS 失败了,比如有其他线程抢先了,那么继续自旋,直到设置成功(此时队列见 图3)
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
图2:初始化队列
图3:T2加入到队列中
AbstractQueuedSynchronizer#acquireQueued(Node node, int arg)
boolean acquireQueued(final Node node, int arg) {
//失败标志
boolean failed = true;
try {
//是否被打断
boolean interrupted = false;
for (;;) {
//获取上一个节点
final Node p = node.predecessor();
//如果上一个节点为 Head 节点,就尝试获取锁,为什么是 Head 节点就尝试获取锁呢?因为上文我们分析了, Head 节点是不参与抢锁的,再次执行 tryAcquire 方法
if (p == head && tryAcquire(arg)) {
//如果抢到了锁,将此Node赋给Head
setHead(node);
//help GC,移除节点关系
p.next = null;
//获取锁成功
failed = false;
//返回结果
return interrupted;
}
//假如此时并没有获取到锁(场景2 中,T1还在执行,所以T2获取失败),此时要去验证一下,此节点是否需要执行 park,如果需要,就执行park,线程等待。(等唤醒之后,再次进入循环去尝试获取锁)
if (shouldParkAfterFailedAcquire(p, node) &&
/**
//阻塞当前线程,不要继续执行了,等待锁吧
LockSupport.park(this);
return Thread.interrupted();
*/
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//如果失败了,取消获取
if (failed)
cancelAcquire(node);
}
}
在进入下一个源码之前,我们先看一下 Node
的各个状态
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire(Node pred,Node node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//初始化状态为 0
int ws = pred.waitStatus;
//如果状态为SIGNAL,代表此线程可以被 park 了,第一次进来状态为0,再次循环之后,状态为SIGNAL,然后执行park操作 (上文代码:acquireQueued:parkAndCheckInterrupt())
if (ws == Node.SIGNAL)
return true;
//取消抢锁
if (ws > 0) {
do {
// PREV->PRED->NODE ====> PREV->NODE (移除pred)
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//将 状态设置为 SIGNAL ,从英文注释来看,就是当前状态为 0 或者 PROPAGATE ,(当前场景下 HEAD 状态为 0,CAS 设置状态为-1,注意,这里设置的是当前节点的前一个节点的状态,不是自己),设置完成之后,返回false,
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//继续循环(上文代码:acquireQueued: for(;;))
return false;
}
同样,如果T3此时也想获取锁,那么抱歉,加入队列,然后你的前一个节点也不是 Head 节点,直接 park
吧
链表中为什么没有 T1呢?它已经获取锁玩去了,不需要入队。
代码执行流程
总结
本文分析了 ReentrantLock
在使用公平锁下的lock
流程,用一个简单的场景去分析代码,在不同的情况下每段代码的注释是不一样的,所以高并发场景下的代码情况和分支真的非常多,也很复杂。有分析错误的地方欢迎大家指出。
需要关注的地方:
- 链表操作,设置 head ,tail 等
- head 不参与抢锁,thread 为 null
- 两个线程交替执行,并且很快释放锁的情况下,是不需要初始化队列的,即使初始化了队列,第二个线程还是会在入队之后再次尝试一次获取锁,实在获取不到,就 park。
- 第三个线程进来,直接排队,因为T2在前面