ReentrantLock
ReentrantLock是一种可重入的互斥锁,它的行为和作用与关键字synchronized有些类似,在并发场景下可以让多个线程按照一定的顺序访问同一资源。相比synchronized,ReentrantLock多了可扩展的能力,比如我们可以创建一个名为MyReentrantLock的类继承ReentrantLock,并重写部分方法使其更加高效。
当一个线程调用ReentrantLock.lock()方法时,如果ReentrantLock没有被其他线程持有,且不存在额外的线程与当前线程竞争ReentrantLock,调用ReentrantLock.lock()方法后当前线程会占有此锁并立即返回,ReentrantLock内部会维护当前线程对锁的引用计数,当线程获取锁时会增加其线程对锁的引用计数,当线程释放锁时会减少线程对锁的引用计数,当前线程如果在占有锁之后,又重复获取锁,则会增加锁的引用计数,当锁的引用计数为0的时候,代表当前线程完全释放锁。需要注意的是,只有占有锁的线程才会增加锁的引用计数,当锁被占据时,如果有其他线程要竞争锁,ReentrantLock会把其他线程加入一个竞争锁的队列,并让线程陷入阻塞,直到占据锁的线程释放了锁,ReentrantLock才会唤醒队列中的线程重新竞争锁。
我们用下面的例子来加深对于锁的理解,假设我们的进程内目前没有任何线程竞争lock,此时锁的引用计数为0,有一个线程Thread-1调用完下面<1>处的lock()方法成功占有锁,此时锁的引用计数由0变为1。之后Thread-1调用了<2>处的methodB()方法,methodB()的<4>处又获取了一次锁,由于lock已经被Thread-1占据,所以这里简单的对锁的引用计数+1即可,此时锁的引用计数为2,Thread-1执行完methodB()的方法体后,执行<5>处的unlock()方法释放锁,这里对锁的引用计数-1,由2变为1。在调用完methodB后,执行methodA的方法体,最后执行<3>处的unlock()方法,将锁的引用计数由1变为0,Thread-1完全释放锁。此时,锁变为无主状态。
private final ReentrantLock lock = new ReentrantLock(); public void methodA() { try { lock.lock();//<1> methodB();//<2> //methodA body... } finally { lock.unlock();//<3> } } public void methodB() { try { lock.lock();//<4> //methodB body... } finally { lock.unlock();//<5> } }
ReentrantLock提供了isHeldByCurrentThread()和getHoldCount()两个方法,前者用于判断锁是否被当先调用线程持有,如果被当前调用线程持有则返回true;后者不仅会判断锁是否被当前线程持有,还会返回锁相对于当前线程的引用计数,毕竟锁是可重入的,如果锁没有被任何线程持有,或者被不是持有锁的线程调用getHoldCount()方法,就会返回0。
这两个方法的实现原理也很简单,我们知道在Java中可以调用Thread.currentThread()来获取当前线程对象。当我们调用ReentrantLock.lock()方法成功获取锁之后,ReentrantLock内部会用一个独占线程(exclusiveOwnerThread)字段来标识当前占用锁的Thread线程对象,如果线程释放了锁且锁的引用计数为0,则将独占线程字段标记为null。当要判断锁是否被当前线程持有,或者锁相对于当前线程的引用计数,则获取调用方线程的Thread对象,和内部的独占线程字段做下对比,如果两者的引用相等,代表当前线程占用了锁,如果引用不相等,则表示当前所可能处于无主状态,或者锁被其他线程持有。
如下面的代码,我们希望只有持有lock的线程才可以执行methodB()和methodC()方法,就可以用isHeldByCurrentThread()和getHoldCount()进行判断。
private final ReentrantLock lock = new ReentrantLock(); public void methodA() { try { lock.lock(); methodB(); methodC(); //methodA body... } finally { lock.unlock(); } } public void methodB() { if (lock.getHoldCount() != 0) { //methodB body... } } public void methodC() { if (lock.isHeldByCurrentThread()) { //methodC body... } }
需要注意的一点是,官方有给出isHeldByCurrentThread()和getHoldCount()两个方法的使用范围,仅针对于debug和测试。真正的生产环境如果有重入锁的需要,官方还是推荐用try{}finally{}这一套,在try代码块里获取锁,在finally块中释放锁。
创建ReentrantLock对象时,如果使用的是无参构造方法,则默认创建非公平锁(NonfairSync),如果调用的是ReentrantLock(boolean fair)有参构造方法,fair为true则创建公平锁(FairSync)。
public class ReentrantLock implements Lock, java.io.Serializable { //... //默认创建非公平锁 public ReentrantLock() { sync = new NonfairSync(); } //根据参数指定创建公平锁或非公平锁,true为公平锁。 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } //... }
之前说过,当有多个线程竞争锁时,获取锁失败的线程,会形成一个队列。如果有多个线程竞争公平锁时,会优先把锁分配给等待锁时间最长的线程,即队头的线程,队列中越往后的线程等待锁的时间越短,排在队尾的线程等待时间最短。如果使用的是非公平锁,则不保证会按照等待时长顺序将锁分配。在多线程的场景下,公平锁在吞吐量方面的表现不如非公平锁,但两者在获得锁和保证不饥饿的差异并不大。
需要注意的是,公平锁不能保证线程调度的公平性,竞争公平锁的多个线程中,可能会出现一个线程连续多次获得锁的情况。比如:Thread-1、Thread-2都要竞争同一个锁(lock),但此时锁已经被其他线程占据,Thread-1、Thread-2竞争失败,预备进入等待队列,这时Thread-1、Thread-2的CPU时间片消耗完毕被挂起,而其他线程刚好释放锁将锁变为无主状态,此时Thread-3抢锁成功,并调用下面的doThread3()方法,连续10次获取锁并释放锁将锁变为无主状态。这种情况,就是上面说的公平锁无法保证线程调度的公平性,按照顺序,Thread-3在Thread-1、Thread-2竞争失败后才开始竞争,按理锁的分配顺序应该是Thread-1->Thread-2->Thread-3,但由于线程的调度问题,Thread-1、Thread-2尚未入队,而锁被释放后刚好被Thread-3“捡漏”
public void methodA() { try { lock.lock(); //methodA body... } finally { lock.unlock(); } } public void doThread3() { for (int i = 0; i < 10; i++) { methodA(); } }
除了调用ReentrantLock.lock()以阻塞的方式直到获取锁,ReentrantLock还提供了tryLock()和tryLock(long timeout, TimeUnit unit)两个方法来抢锁。我们看下面的代码,相信很多同学看到这两个方法后也能知道这两个方法和lock()方法的区别,tryLock()会尝试竞争锁,如果锁已被其他线程占用,则竞争失败,返回false,如果竞争成功,则返回true。tryLock(long timeout, TimeUnit unit)如果竞争锁失败后,会先进入等待队列,如果在过期前能竞争到锁,则返回true,如果在过期时间内都无法抢到锁,则返回false。
public void methodD() { boolean hasLock = false; try { hasLock = lock.tryLock();//<1>非计时 if (!hasLock) {//没有抢到锁则退出 return; } //methodD body... } finally { if (hasLock) { lock.unlock(); } } } public void methodE() { boolean hasLock = false; try { hasLock = lock.tryLock(5, TimeUnit.SECONDS);//<2>计时 if (!hasLock) {//没有抢到锁则退出 return; } //methodE body... } catch (InterruptedException e) { e.printStackTrace(); } finally { if (hasLock) { lock.unlock(); } } }
需要注意的是:不管是公平锁还是非公平锁,不计时tryLock()都不能保证公平性,如果锁可用,即时其他线程正在等待锁,也会抢锁成功。
ReentrantLock内部会用一个int字段来标识锁的引用次数,因此,ReentrantLock虽然作为可重入锁,但它的最大可重入次数为2147483647(即:MaxInt32,2^31-1),不管我们是以递归或者是循环亦或者其他方式,一旦我们重复获取锁的次数超过这个次数,ReentrantLock就会抛出异常。
至此,我们了解了ReentrantLock的简单应用。下面,就请大家一起跟随笔者了解ReentrantLock的实现原理。下面的代码是笔者从ReentrantLock节选的部分代码,可以看到先前我们调用加锁(lock、lockInterruptibly、tryLock)、解锁(unlock)的代码,最后都会调用sync对象的方法,sync对象的类型是一个抽象类,在我们创建ReentrantLock对象时,会根据构造函数决定sync是公平锁(FairSync),还是非公平锁(NonfairSync),FairSync和NonfairSync都继承自Sync,所以ReentrantLock在创建好具体的Sync对象后,便不再管关心公平锁的逻辑或者是非公平锁的逻辑,ReentrantLock只知道抽象类Sync实现了它所需要的功能,这个功能是公平亦或是非公平,由具体的实现子类来关心。
public class ReentrantLock implements Lock, java.io.Serializable { //... private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer {//...} static final class NonfairSync extends Sync {//...} static final class FairSync extends Sync {//...} public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } public void lock() { sync.acquire(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock() { return sync.nonfairTryAcquire(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } public void unlock() { sync.release(1); } //... }
鉴于ReentrantLock的无参构造函数是创建一个非公平锁,可见官方更倾向于我们使用非公平锁,这里,我们就先从非公平锁开始介绍。
当ReentrantLock为非公平锁时,调用lock()方法会直接调用sync.acquire(1),NonfairSync和Sync两个类都没有实现acquire(int arg),这个方法是由AbstractQueuedSynchronizer(抽象队列同步器,下面简称:AQS)实现的,也就是Sync的父类。
当线程竞争锁时,会先调用tryAcquire(arg)方法试图占有锁,AQS将tryAcquire(int arg)的实现交由子类,由子类决定是以公平还是非公平的方式占有锁,如果竞争成功tryAcquire(arg)则返回true,!tryAcquire(arg)的结果为false,于是就不会再调用<1>处后续的判断,直接返回。如果占有锁失败,这里会先调用addWaiter(Node mode)方法,将当前调用线程封装成一个Node对象,再调用acquireQueued(final Node node, int arg)将Node对象加入到等待队列中,并使线程陷入阻塞。
//java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//<1> selfInterrupt(); } //AbstractQueuedSynchronizer将tryAcquire(int arg)的实现交由子类 //java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquire protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
我们先来看NonfairSync实现的tryAcquire(int acquires)方法,这里NonfairSync也是调用其父类Sync的nonfairTryAcquire(int acquires)方法。在AQS内部会维护一个volatile int state,可重入互斥锁会用这个字段存储占有锁的线程对锁的引用计数,即重复获取锁的次数。如果state为0,代表锁目前没有被任何线程占有,这里会用CAS的方式设置锁的引用计数,如果设置成功,则执行<2>处的代码将独占线程(exclusiveOwnerThread)的引用指向当前调用线程,然后返回true表示加锁成功。
如果当前state不为0,代表有线程正独占此锁,会在<3>处判断当前线程是否是独占线程,如果是的话则在<4>处增加锁的引用计数,这里同样是修改state的值,但不需要像<1>处那样用CAS的方式,因为<4>处的代码只有独占线程才可以执行,其他线程都无法执行。需要注意的一点是,state为int类型,最大值为:2^31-1,如果超过这个值state就会变为负数,就会报错。如果一个线程在竞争锁的时候,发现state不为0,且当前线程不是独占线程,则会返回false,表示抢锁失败。
//当调用AQS的acquire(int arg)时,会先调用由子类实现的tryAcquire(int acquires)方法 //java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire protected final boolean tryAcquire(int acquires) { //这里会调用父类Sync的nonfairTryAcquire(int acquires)方法 return nonfairTryAcquire(acquires); } //java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire final boolean nonfairTryAcquire(int acquires) { //获取当前线程对象 final Thread current = Thread.currentThread(); //这里会获取父类AQS的state字段,在可重入互斥锁里,state表示占有锁的线程的引用计数 int c = getState(); //如果state为0,表示目前锁是无主状态 if (c == 0) { //如果锁处于无主状态,则用CAS修改state,如果修改成功,表示占有锁成功 if (compareAndSetState(0, acquires)) {//<1> //占有锁成功后,这里会设置锁的独占线程 setExclusiveOwnerThread(current);//<2> return true; } } else if (current == getExclusiveOwnerThread()) {//<3>如果state不为0,代表现在有线程占据锁,如果请求锁的线程和独占线程是同一个线程,则增加当前线程对锁的引用计数 //锁的最大可重入次数为(2^31-1),超过这个最大范围,int就会变为负数,判断nextc为负数时报错。 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //重新设置state的值 setState(nextc);//<4> return true; } return false; } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //... //在可重入互斥锁中,state代表独占线程当前的重入次数 private volatile int state; protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } //... } public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { //... //独占线程,当有线程占据可重入互斥锁时,会用此字段存储占有锁的线程 private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
按照AbstractQueuedSynchronizer.acquire(int arg)的逻辑,如果抢锁失败,会继而执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)这段代码。这里我们需要先来了解下Node的数据结构,Node类是AQS的一个静态内部类。如果眼尖的同学看到下面的prev和next,一定能很快猜出这就是我们先前所说的等待队列,等待队列实质上是一个双端链表,即每个节点都可以知道自己的前驱,也可以知道自己的后继。
//java.util.concurrent.locks.AbstractQueuedSynchronizer.Node static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; //... volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; //... //返回当前节点的前驱节点 final Node predecessor() { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() {} //... //创建Node节点 Node(Node nextWaiter) {//<1> this.nextWaiter = nextWaiter; THREAD.set(this, Thread.currentThread()); } }
这里简单介绍下Node的字段:
- prev指向当前节点的前驱节点,next指向当前节点的后继节点。
- thread字段在调用<1>处的构造方法时,会将thread指向当前调用线程的Thread对象。
- waitStatus(等待状态)初始值为0,当waitStatus为SIGNAL(-1)时,表示当前节点的后继节点所指向的线程(node.next.thread)陷入阻塞,当前节点如果被移除(CANCELLED)或在占有锁后要释放锁的时候,需要唤醒后继节点的线程。这里有多种可能导致当前节点的等待状态变为移除,比如调用tryLock(long timeout, TimeUnit unit) 超时会获取到锁,或者调用lockInterruptibly()后线程被中断。
- nextWaiter可以用来表示一个节点的线程到底是独占线程(EXCLUSIVE)还是共享线程(SHARED),独占线程一般用于可重入互斥锁(ReentrantLock)或者可重入读写锁(ReentrantReadWriteLock )的写锁,而共享线程则表示当前线程是可以和其他共享线程一起共享资源的,一般用于可重入读写锁的读锁。
如果对上面Node字段还有不理解的地方不用心急,笔者在后面还会和大家一起深入了解这几个字段。
在简单了解了Node的数据结构后,我们来看看AQS是如何将一个线程封装成一个Node对象,并将其加入到等待队列。addWaiter(Node mode)会根据传入的参数node,决定创建的节点是独占节点还是共享节点,先前ReentrantLock传入的是Node.EXCLUSIVE,所以这里是独占节点,在执行完<1>处的代码后,节点创建完毕,节点的thread字段也保存了当前线程对象的引用。之后会进入<2>处的循环,这里是通过CAS自旋的方式将节点加入到等待队列,之所以用这种方式是因为可能存在多个线程同时要入队的情况,用CAS自旋保证每个节点的前驱和后继的有序性。当节点要入队时,会先获取尾节点,如果在<3>处判断尾节点不为null,则将当前节点的前驱指向尾节点,并用CAS的方式设置当前节点为设置为尾节点,如果原先的尾节点(oldTail)的指向没有被任何线程修改,这里用CAS将当前节点设置成尾节点就会成功,于是原先尾节点的后继指向当前节点,当前节点入队成功。但我们也要考虑尾节点为null的情况,即第一个进入等待队列的节点,此时头节点(header)和尾节点(tail)都为null,这里就会执行<4>处的分支,进行队列初始化。初始化队列的时候,同样存在并发问题,所以这里依旧用CAS初始化头节点成功,再将头节点指向的Node对象赋值给尾节点。初始化队列完毕后,会再开始新的一轮循环,用CAS的方式尝试将节点入队,入队成功后,则返回当前节点。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //... private transient volatile Node head;//等待队列的头节点 private transient volatile Node tail;//等待队列的尾节点 //... private Node addWaiter(Node mode) { //为竞争锁的线程创建一个Node对象,并用Node.thread字段存储调用线程Thread对象 Node node = new Node(mode);//<1> for (;;) {//<2> Node oldTail = tail; if (oldTail != null) {//<3> node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else {//<4> initializeSyncQueue(); } } } private final void initializeSyncQueue() { Node h; if (HEAD.compareAndSet(this, null, (h = new Node()))) tail = h; } private final boolean compareAndSetTail(Node expect, Node update) { return TAIL.compareAndSet(this, expect, update); } //... }
在执行完addWaiter(Node.EXCLUSIVE)确定节点入队后,就要将返回节点传入到方法:acquireQueued(final Node node, int arg)。之前我们说过,抢锁失败的节点会进入一个等待队列,等待锁的分配,我们已经在addWaiter(Node mode)看到线程是如何入队的,那接下来就要看看线程是如何等待锁的分配。在看acquireQueued(final Node node, int arg)之前,我们先来思考下如果是我们自己会如何设计将锁分配给线程?最简单的做法是每个线程都在一个死循环中去轮询锁的状态,如果发现锁处于无主状态并抢锁成功,线程则跳出循环访问资源。但这个做法有个缺点就是会消耗CPU时间片,尤其对于一些优先级不高的线程,相比于优先级高的线程它们可能永远无法竞争到锁,永远访问不到资源处于饥饿状态。那么有没有相比死循环更好的做法呢?我们是否可以先把一个入队的线程阻塞起来,先让它不要消耗宝贵的CPU时间片,当占据锁的线程完全释放锁(state变为0)时,则去唤醒队列中等待时长最长的线程,这样也不用担心优先级低的线程无法与优先级高的线程竞争锁,导致处于饥饿状态,一举两得。
这里我们还要再加深下对等待队列Node的理解才能往下看acquireQueued(final Node node, int arg),大家思考下,Node中的thread字段是用来指向竞争锁的线程对象,通过这个对象,我们可以用释放锁的线程唤醒等待锁的线程,占用锁的线程在完全释放锁将锁变为无主状态后,唤醒等待锁的线程,这个等待锁的线程如果成功占据了锁,是否可以将本身线程中Node.thread置为null?此刻线程已经占据了锁,它不会再陷入阻塞,也不需要有其他的线程来唤醒自身。所以等待队列的头节点的thread(header.thread)字段永远为null,因为锁被头节点的线程所占用。
当然,也可能出现锁被占用但头节点(header)本身就为null,这种情况一般出现在我们初始化好一个ReentrantLock后,只有一个线程占有了锁,此时调用tryAcquire(int acquires)会调用ReentrantLock.Sync.nonfairTryAcquire(int acquires)方法,这个方法只会简单修改state状态,并不会新增一个头节点。除非锁已有线程占据,且出现新的线程竞争锁,这时候新的线程在进入等待队列的时候,会初始化队列,为本身占据锁的线程补上一个头节点,初始化队列的时候调用的是Node的无参构造方法,所以头节点的thread字段为null,表示锁被当前头节点原先指向的线程所占据。
在了解这些基本知识后,下面我们终于可以来看看大家迫不及待的acquireQueued(final Node node, int arg)了。当把封装了当前线程的Node对象传入到acquireQueued(final Node node, int arg)方法时,并不会立即阻塞当前线程等待其他线程唤醒。这里会先在<1>处获取当前节点的前驱节点p,判断p是不是头节点,如果p是头节点,则当前线程即有占有锁的可能。因为占据锁的线程会先释放锁,再通知队列中的线程抢锁。所以会存在当前节点入队前锁已被释放的情况,于是判断前驱节点p是头节点,会再调用tryAcquire(int acquires)方法抢锁,如果抢锁成功,就可以按照我们上面所说的套路,调用setHead(Node node)将当前节点设置为头节点,设置当前节点的线程引用为null,然后返回。
如果当前节点的前驱节点不是头节点,这里就要调用shouldParkAfterFailedAcquire(Node pred, Node node)设置前驱节点的等待状态(waitStatus),先前说过,这个等待状态可以用来表示下个节点的阻塞状态。假设有一个锁已经被其他线程占有,Thread-1、Thread-2要来抢锁,此时必然是抢锁失败的,这里会把Thread-1、Thread-2分别封装成Node1和Node2并进行入队,Node1和Node2初始的等待状态都为0,假定Node1先Node2入队,Node1为Node2的前驱节点(即:Node2.prev=Node1),Node1不是头节点,所以不会去抢锁,这里直接进入<2>处分支的shouldParkAfterFailedAcquire(Node pred, Node node)方法,Node1的初始等待状态为0,所以<3>处和<5>处的分支是进不去的,只能进入<4>处的分支,将Node1的等待状态设置为SIGNAL,表示Node1的后继节点处于等待唤醒状态,然后返回false,于是<2>处的判断不成立,又开始新的一轮循环,假定头节点的线程依旧没释放锁,Node1依旧不是头节点,还是直接执行shouldParkAfterFailedAcquire(Node pred, Node node)方法,此时判断Node2的前驱节点Node1的等待状态为-1,表示可以阻塞Node1后继节点Node2所指向的线程,所以这里会返回true,进入<2>处的分支,调用parkAndCheckInterrupt()方法,在这个方法中会调用LockSupport.park(Object blocker)阻塞当前的调用线程,直到有其他线程调用LockSupport.unpark(Node2.thread)唤醒Node2被阻塞的线程,或Node2.thread被中断才会退出parkAndCheckInterrupt()。我们注意到在<5>处有一个判断,前驱节点的等待状态>0,一般状态为CANCELLED(1),表示前驱节点被移除。之所以会存在被移除的节点,是因为我们可能以tryLock(long timeout, TimeUnit unit)的方式往等待队列中添加节点,如果超时还未获得锁,这个节点就要被移除;我们还可能用lockInterruptibly()的方式往等待队列中添加节点,如果节点所对应的线程被中断,这个节点也处于被移除状态。所以<5>处如果发现前驱节点的等待状态大于0,会一直往前驱节点遍历直到找到等待状态<=0的节点将其作为前驱节点,并将前驱节点的后继指向当前节点。要注意的是,等待状态为-1时,代表当前节点的后继节点等待唤醒,>0的时候,代表当前节点被移除,前者的状态与后继节点有关,后者的状态仅与自身有关。如果在自旋期间线程出现其他异常,则会调用<6>处的代码将节点从等待队列移除,并抛出异常。cancelAcquire(Node node)会在后面介绍,这里我们只要先知道这是一个将节点从队列中移除的方法。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //... private transient volatile Node head; //... final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) { final Node p = node.predecessor();//<1> if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node))//<2> interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node);//<6> if (interrupted) selfInterrupt(); throw t; } } //... //设置当前节点为头节点,此时可以清空头节点指向的线程引用 private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } //... private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL)//<3> /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) {//<5> /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else {//<4> /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don‘t park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } return false; } //... private final boolean parkAndCheckInterrupt() { //阻塞调用线程,可调用LockSupport.unpark(Thread thread)唤醒或由线程中断唤醒。 LockSupport.park(this); //返回线程是否由中断唤醒,返回true为被中断唤醒,但此方法会清除线程的中断标记 return Thread.interrupted(); } //... }
能从boolean acquireQueued(final Node node, int arg)方法中返回的线程,都是成功占有锁的线程,但返回结果分当前线程是否被中断,true为被中断。可能存在这样一种情况,前一个线程释放锁完毕后,即将唤醒后一个线程,此时后一个线程被中断唤醒,后一个线程发现其Node节点的前驱节点为头节点,且锁为无主状态,于是抢锁成功直接返回。这里要标记线程的中断状态interrupted,因为线程会从parkAndCheckInterrupt()中被唤醒,最后会执行Thread.interrupted()返回当前线程是否由中断唤醒,但Thread.interrupted()会清除中断标记,所以在占据锁之后会根据返回的interrupted状态,决定是否设置线程的中断状态。如果一个线程在调用acquireQueued(final Node node, int arg)方法的后都未被中断,直到前一个线程调用LockSupport.unpark(Thread thread)唤醒该线程,那么这个线程就不是用中断的形式唤醒,也就不用设置线程的中断状态。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //... public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //根据acquireQueued()的返回,决定是否设置线程的中断标记 selfInterrupt(); } //... static void selfInterrupt() { Thread.currentThread().interrupt(); } //... }