在了解如何加锁时候,我们再来了解如何解锁。可重入互斥锁ReentrantLock的解锁方法unlock()并不区分是公平锁还是非公平锁,Sync类并没有实现release(int arg)方法,这里会实现调用其父类AbstractQueuedSynchronizer的release(int arg)方法。在release(int arg)方法中,会先调用其子类实现的tryRelease(int arg)方法,这里我们看看ReentrantLock.Sync.tryRelease(int releases)的实现。
正常情况下,这段代码只能由占有锁的线程调用,所以这里会先获取锁的引用计数,再减去释放次数,即:c = getState() - releases,然后判断尝试释放锁的线程,是否是独占锁的线程,如果不是则抛出IllegalMonitorStateException异常。如果独占线程在释放锁后锁的引用计数为0,则设置独占线程为null,再设置state为0,代表锁成为无主状态。
如果确定锁成为无主状态后,release(int arg)会检查队列中是否有需要唤醒的线程,如果头节点header为null,则代表除了释放锁的线程,没有任何线程抢锁,如果头节点的等待状态为0,代表头节点目前没有需要唤醒的后继节点。如果头节点不为null且等待状态不为0,则代表队列中可能存在需要唤醒的线程,会进而执行unparkSuccessor(Node node),将头节点作为参数传入。
当我们将头节点传入unparkSuccessor(Node node),如果判断头节点的等待状态<0,则会将头节点的等待状态设置为0,如果头节点的后继节点不为null,或者后继节点尚未被取消,会直接唤醒后继节点,后继节点会退出parkAndCheckInterrupt()方法,acquireQueued(final Node node, int arg)的循环中重新竞争锁,如果竞争成功,则后继节点成为头节点,退出抢锁逻辑开始访问资源,如果竞争失败,这里最多循环两次执行shouldParkAfterFailedAcquire(Node pred, Node node),第一次先将后继节点的前驱节点的等待状态由0改为SIGNAL(1),表示前驱节点的后继节点处于等待唤醒状态,第二次循环如果还是抢锁失败,shouldParkAfterFailedAcquire(Node pred, Node node)判断前驱节点的等待状态为SIGNAL,返回true,后继节点的线程陷入阻塞。需要注意的是,即便是公平锁,也可能存在不公平的情况,公平锁头节点的后继节点,也有可能存在抢锁失败的情况,比如之前说过,公平锁在调用tryLock()时是不保证公平的。
这里我们需要注意一下,后继节点是可能存在被移除的情况,这种情况会在后续讲解tryLock(long timeout, TimeUnit unit)的时候说明,如果一旦出现后继节点被移除的情况(waitStatus > 0),在唤醒后继节点时,会从尾节点向前驱节点遍历,找到最靠近头节点的有效节点。
public class ReentrantLock implements Lock, java.io.Serializable {
//...
private final Sync sync;
//...
abstract static class Sync extends AbstractQueuedSynchronizer {
//...
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//...
}
//...
public void unlock() {
sync.release(1);
}
//...
} public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//...
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//...
//由子类实现尝试解锁方法。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//...
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0); /*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
//如果后继节点不为null但被移除,会从尾节点向前驱节点遍历,找到最靠前的有效节点
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);
}
//...
}
可重入互斥锁的tryLock()方法不分公平锁或非公平锁,统一调用ReentrantLock.Sync.nonfairTryAcquire(int acquires),在JUC长征篇之ReentrantLock源码解析(一)已经介绍过nonfairTryAcquire(int acquires)方法了,这里就不再赘述了。
public class ReentrantLock implements Lock, java.io.Serializable {
//...
private final Sync sync;
//...
abstract static class Sync extends AbstractQueuedSynchronizer {
//...
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//...
}
//...
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
//...
}
调用ReentrantLock.tryLock(long timeout, TimeUnit unit)方法时,会调用Sync父类AQS的tryAcquireNanos(int arg, long nanosTimeout)方法,如果线程已被标记为中断,则会进入<1>处的分支并抛出InterruptedException异常,否则先调用tryAcquire(int acquires)尝试抢夺,如果抢锁失败,则会调用doAcquireNanos(int arg, long nanosTimeout)陷入计时阻塞,如果在阻塞期间内线程被中断,则会抛出InterruptedException异常,如果到达有效期后线程还未获得锁,tryAcquireNanos(int arg, long nanosTimeout)将返回false。
当进入doAcquireNanos(int arg, long nanosTimeout)后,会先在<2>处计算获取锁的截止时间,之后和原先的acquireQueued()很像,先把当前线程封装成一个Node对象并加入到等待队列,再判断当前节点的前驱节点是否是头节点,是的话再尝试抢锁,如果抢锁成功则退出。否则用截止时间减去系统当前时间,算出线程剩余的抢锁时间(nanosTimeout)。如果剩余时间<=0的话,代表到达截止时间后线程依旧未占用锁,于是调用<3>处的代表将线程对应的Node节点从等待队列中移除,返回false表示抢锁失败。如果在后面的循环中如果发现当前时间大于截止时间,而线程还未获得锁,代表抢锁失败。
如果剩余时间大于0,则会调用shouldParkAfterFailedAcquire(),如果前驱节点的状态状态为0,则将前驱节点的等待状态设置为-1表示其后继节点等待唤醒,然后在下一次循环的时候,shouldParkAfterFailedAcquire()判断前置节点的等待状态为-1,就有机会阻塞当前线程。但相比acquireQueued()不同的是,这里会判断剩余时间是否大于1000纳秒,如果剩余时间小于等于1000纳秒的话,这里就不会阻塞线程,而是用自旋的方式,直到抢锁成功,或者锁超时抢锁失败。如果自旋期间或者阻塞期间线程被中断,则会在<6>处抛出InterruptedException异常。
public class ReentrantLock implements Lock, java.io.Serializable {
//...
private final Sync sync;
//...
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
//...
} public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
//...
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())//<1>
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
//...
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;//<2>
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) {
cancelAcquire(node);//<3>
return false;
}
if (shouldParkAfterFailedAcquire(p, node) &&//<4>
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)//<5>
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();//<6>
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
//...
}
下面,我们来看看AQS是如何移除一个节点的,当要移除一个节点时,会先置空它的线程引用,再在<1>处遍历当前节点的前驱节点,直到找到有效节点(等待状态<=0),找到最靠近当前节点的有效节点pred后,再找到有效节点的后继节点predNext(pred.next),然后我们将当前节点的等待状态置为CANCELLED(1),如果当前节点是尾节点,则用CAS的方式设置尾节点为有效节点pred,如果pred成功设置为为节点,这里还会用CAS的方式设置pred的后继为null,因为尾节点不应该有后继节点,这里用CAS设置尾节点的后继节点是防止pred成为尾节点后,还未置空尾节点的后继节点前,又有新的节点入队成为新的尾节点,并且设置pred的后继节点为最新的尾节点。如果<2>出的代码执行成功,代表当前没有新的节点入队,如果执行失败,代表有新的节点入队,pred已经不是尾节点,且pred的后继已经被修改。
如果node不是尾节点,或者在执行compareAndSetTail(node, pred)的时候,有新节点入队,tail引用指向的对象已经不是node本身,或者在执行compareAndSetTail()执行失败,则会进入<3>处的分支。进入<3>处的分支后,会先判断当前节点的前驱节点是不是头节点,如果是头节点的话,会进入<5>处的分支,唤醒当前节点的后继节点来竞争锁,如果当前节点的后继节点为null或者被取消的话,则会从尾节点开始遍历前驱节点,找到队列最前的有效节点,唤醒有效节点竞争锁。
如果前驱节点不是头节点,且前驱节点的等待状态为SIGNAL或者前驱节点的等待状态为有效状态(waitStatus<=0)且成功设置前驱节点的等待状态为SIGNAL,再判断前驱节点前驱节点的thread引用是否为null,如果不为null则代表前驱节点还不是头节点或者尚未被取消,此时就可以进入<4>处的分支,这里如果判断当前节点的后继节点不为null或者尚未被取消,则用CAS的方式设置前驱节点的后继节点,为当前节点的后继节点。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//...
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return; node.thread = null; // Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)//<1>
node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary, although with
// a possibility that a cancelled node may transiently remain
// reachable.
Node predNext = pred.next; // Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
pred.compareAndSetNext(predNext, null);//<2>
} else {//<3>
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
pred.thread != null) {//<4>
Node next = node.next;
if (next != null && next.waitStatus <= 0)
pred.compareAndSetNext(predNext, next);
} else {//<5>
unparkSuccessor(node);
} node.next = node; // help GC
}
}
//...
}
先前我们说过,会存在后继节点指向被取消节点的情况,就是发生在cancelAcquire(Node node)方法里,下图是一个锁的等待队列,N1是队头,N1所对应的线程T1正占有锁进行资源访问,N2和N5调用lock()方法采用非计时阻塞请求锁,除非N2和N5对应的线程获取到锁,否则将永远阻塞;N3和N4调用tryLock(long timeout, TimeUnit unit)方法采用计时阻塞请求锁,如果超时对应的线程还未获取到锁,N3和N4将会从队列中移除,返回抢锁失败。
我们假定N3和N4已经超时,要从队列中移除,看看并发场景下是如何出现有效节点的后继引用指向无效节点,这里笔者稍微简化cancelAcquire(Node node),我们只要专注可能出现有效节点指向无效节点的代码。
假定N3和N4两个节点对应的线程是T3和T4,T3要从等待队列中移除N3,先获取N3的前驱节点pred(N3)为N2,N2是一个有效节点(waitStatus<=0),所以不需要在<1>处遍历,接着在<2>处获取N2的后继节点predNext(N2)为N3,再将N3的等待状态改为CANCELLED,此时T3挂起,T4开始执行。T4线程同样获取N4前驱节点pred(n4)为N3,然后发现N3的等待状态>0,会一直往前遍历到N2,所以N4的前驱节点pred(N4)会为N2,接着T4执行<2>处的代码,predNext(N2)也是N3,此时T4挂起,T3恢复执行。T3判断N3不是尾节点,于是进入分值<4>,T3判断N3的前驱节点N2不是头节点,N2的状态为为SIGNAL,且N2的thread字段不为空,表明N2既没有被取消,也不是头节点,于是进入<5>处的分值,这里获取N3的后继节点N4,由于N4对应的线程T4尚未执行到<3>处的代码,N4的等待状态依旧为SIGNAL,所以T3会进入<6>处的分支,将N2的后继节点指向N4。此时T4开始执行,将N4的等待状态改为CANCELLED,T4进行<4>处的分支,N4的前驱N2不是头节点,等待状态为SIGNAL,且N2的线程引用不为怒null,继而进入<5>处的分值,获取到N4的后继N5,N5不Wie空,且N5的等待状态<=0,进而进入到<6>分支,最后要用CAS的方式尝试将N2的后继节点设置为N5,但这里的设置一定会失败,因为此时N2的后继节点为N4,而T4原先获取到N2的后继节点为N3,出现了有效节点指向无效节点的情况。
private void cancelAcquire(Node node) {
//...
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;//<1>
Node predNext = pred.next;//<2>
node.waitStatus = Node.CANCELLED;//<3>
if (node == tail && compareAndSetTail(node, pred)) {
//...
} else {//<4>
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
pred.thread != null) {//<5>
Node next = node.next;
if (next != null && next.waitStatus <= 0)//<6>
pred.compareAndSetNext(predNext, next);
} else {
//...
}
//...
}
}
最后等待队列的布局如下所示,N2指向无效节点N4,而非N4的后继节点。同时这里也引出另一个问题,哪怕N4不是无效节点,在上面移除节点的代码中,只设置了N2的后继引用指向N4,却没设置N4的前驱引用指向N2,所以这里N4的前驱依旧指向N3。
那么如果真的出现上述这样的情况,ReentrantLock是如何来修复这个队列呢?答案在释放锁的时候调用unparkSuccessor(Node node)。笔者先前在介绍这个方法时,就有意提到后继节点可能指向无效节点,当N1对应的线程使用完锁释放之后,N2对应的线程T2接着使用锁并释放锁,在N2释放锁的时候,发现N2的后继节点已经成为失效节点(waitStatus > 0),这里会从尾节点开始找到队列中最前面的有效节点,然后将其唤醒,这里也就是N5。
private void unparkSuccessor(Node node) {
//...
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);
}
N5在被唤醒后,会调用shouldParkAfterFailedAcquire(Node pred, Node node)发现原先的前驱节点N4的等待状态处于被移除,会进入<1>处的分支,查找到最靠近自己的有效前驱节点,并将前驱节点的后继节点指向自己。这里也能回答我们先前的问题,为何在要移除一个节点时,只修改前驱节点的后继引用为被移除节点的后继,却不将被移除节点的后继节点的前驱引用,指向其前驱,因为在释放锁的时候,会唤醒头节点的后继,如果被唤醒的后继发现自己的前驱已经被移除,会往前查找最靠近自己的有效前驱,这里一般是头节点,接着将头节点的后继引用指向自己,再往后就是我们熟悉的流程了,如果前驱节点是头节点且抢锁成功,则退出acquireQueued()方法进行资源的访问,如果抢锁失败,则最多执行两次shouldParkAfterFailedAcquire()然后陷入阻塞,等待下一次的唤醒。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//...
if (ws > 0) {//<1>
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//...
}
return false;
}
我们已经了解了lcok()、tryLock()以及tryLock(long timeout, TimeUnit unit),下面的lockInterruptibly()就变得尤为简单,lockInterruptibly()顾名思义也是无限期陷入阻塞,直到获得锁或者被中断,如果线程本身被中断,在抢锁时会直接抛出InterruptedException异常,否则就开始抢锁,如果抢锁成功则皆大欢喜,抢锁失败则执行doAcquireInterruptibly(int arg),这个方法相信不需要笔者做过多的介绍,基本上很多步骤和方法在上面已经介绍过了。将线程封装成Node节点并入队,判断Node节点的前驱是否是头节点,是的话则试图抢锁,如果不是的话则最多循环两次执行shouldParkAfterFailedAcquire(),然后陷入阻塞,直到前驱节点成为头节点并释放锁将其唤醒,或者线程被中断,抛出InterruptedException异常。
public class ReentrantLock implements Lock, java.io.Serializable {
//...
private final Sync sync;
//...
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
//..
} public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//...
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
//...
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
//...
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
//...
}
最后,笔者就简单介绍下公平锁(FairSync)的tryAcquire(int acquires)方法,如果我们查看ReentrantLock的源码会发现,在执行lock()、tryLock(long timeout, TimeUnit unit)和lockInterruptibly(),这几个方法最终都会调用到AQS对应的三个方法:acquire(int arg)、tryAcquireNanos(int arg, long nanosTimeout)、acquireInterruptibly(int arg),这三个方法在抢锁的时候会优先执行子类实现的tryAcquire(int arg)方法,也就是公平锁(FairSync)或者非公平锁(NonfairSync)实现的tryAcquire(int arg)方法,抢锁失败再执行AQS自身实现的入队、阻塞方法。
下面,我们来看下公平锁实现的tryAcquire(int acquires)方法,其实这个方法的实现也非常简单,先判断锁目前是否处于无主状态,是的话再判断队列中是否有等待线程,确认锁是无主状态且队列中没有等待线程,便开始尝试抢锁,抢锁成功则直接返回。公平锁相较于非公平锁的抢锁逻辑,也仅仅是多了一步而已,判断锁是否无主,是的话再判断队列中是否有等待线程,不像非公平锁,只要判断是无主线程便不再查看等待队列,直接尝试抢锁。
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果state为0,且队列中没有等待线程,则尝试抢锁
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}