ReentrantLock 的公平锁源码分析

ReentrantLock 源码分析   以公平锁源码解析为例:

1:数据结构:

维护Sync 对象的引用:   private final Sync sync;

Sync对象继承 AQS,  Sync  分为两个类:处理公平锁锁和非公平锁:

FairSync   NonfairSync

具体的类图如下:

ReentrantLock 的公平锁源码分析

2:接下来重点分析AQS这个类:AbstractQueuedSynchronizer:

AQS中的成员变量:

private transient volatile Node head;   //AQS维护队列的头结点

private transient volatile Node tail;     // AQS维护队列的尾结点

private volatile int state;                            // AQS 锁的状态  数量标识锁被获取的次数

下面看看NODE 结点的成员变量:

volatile int waitStatus;   //等待状态

volatile Node prev;      //前继节点

volatile Node next;      //后继节点

volatile Thread thread;   //线程对象

Node nextWaiter;       //下一个等待节点

从NODE的数据结构可以看出来,AQS里面维护的队列的数据结构是双链表的形式;

 
   

ReentrantLock 的公平锁源码分析

3:接下来分析 ReentrantLock  的构造方法:

ReentrantLock lock = new ReentrantLock(true);   //传入true,说明是构造公平锁

具体的构造方法如下,返回FairSync 对象:

public ReentrantLock(boolean fair) {

sync = fair ? new FairSync() : new NonfairSync();

}

4:lock方法的分析:因为是公平锁,所以调用 FairSync 下的lock方法:

final void lock() {

acquire(1);

}

Acquire 的方法如下:

public final void acquire(int arg) {    // arg=1

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

接下来分两种场景来分析tryAcquire(arg)  1:同一线程第一次或N次获取锁(其他线程没有获取到锁)   2:其他线程已经获取到锁,当前线程尝试去获取锁;

//场景 1:

protected final boolean tryAcquire(int acquires) {   // acquires=1

final Thread current = Thread.currentThread();   //当前线程:main-thread

int c = getState();    //如果为第一次获取锁c=0  如果main线程已经获取过锁,则c为加锁的次数

if (c == 0) {     //当前线程第一次获取锁

if (!hasQueuedPredecessors() &&  // hasQueuedPredecessor的分析如下单独分析:

compareAndSetState(0, acquires)) {  //cas原子操作 state=1

setExclusiveOwnerThread(current);  //独占线程设置为当前线程

return true;  //返回true表明加锁成功

}

}

else if (current == getExclusiveOwnerThread()) {  // c=n 的情况下

int nextc = c + acquires;    // nextc=n+1

if (nextc < 0)

throw new Error("Maximum lock count exceeded");

setState(nextc);    //设置 state=n+1

return true;  //获取到锁,返回true

}

return false;

}

说明:hasQueuedPredecessors主要是判断当前线程所在的节点是不是CLH队列的首个位置,这个判断的目的是公平锁的公平获取锁的机制

hasQueuedPredecessors的源码如下:以该线程是第一次获取锁为例分析:

public final boolean hasQueuedPredecessors() {

Node t = tail;      // tail=null

Node h = head;    // head=null

Node s;

return h != t &&    //返回false

((s = h.next) == null || s.thread != Thread.currentThread());

}

//场景2 分析:有其他线程未释放锁(main-thread 持有锁,thread-1尝试去获取锁)

protected final boolean tryAcquire(int acquires) {   // acquires=1

final Thread current = Thread.currentThread();   //当前线程:thread-1

int c = getState();    // 由于其他持有锁 state 至少为1

if (c == 0) {

if(!hasQueuedPredecessors()&&  compareAndSetState(0, acquires)) {

setExclusiveOwnerThread(current);

return true;

}

}

else if (current == getExclusiveOwnerThread()) {  // current=thread1  // getExclusiveOwnerThread()=main

int nextc = c + acquires;

if (nextc < 0)

throw new Error("Maximum lock count exceeded");

setState(nextc);

return true;

}

return false;   //此时返回false表明尝试获取锁失败

}

5:上面的场景1 获取到锁后返回true,则lock 方法执行结束。下面分析场景2:

public final void acquire(int arg) {

if (!tryAcquire(arg) &&     //尝试获取锁失败,返回false

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

!tryAcquire(arg)  返回为true;接下来进入

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  这个逻辑;

这个方法有两层处理:

1:addWaiter(Node.EXCLUSIVE)   将当前线程调价到CLH队列中

2:acquireQueued();逐步执行CLH队列中的线程,如果当前线程获取到锁则返回,否则,当前线程进行休眠,直到唤醒并重新获取到锁才返回;

以下分析这两个方法:场景:main线程获取到锁但未释放,这是线程 thread-1去获取锁:(假设此时没有其他线程在CLH队列中,即CLH队列为null)

addWaiter(Node.EXCLUSIVE)方法:入参 Node.EXCLUSIVE 为null;

private Node addWaiter(Node mode) {   //mode=null  EXCLUSIVE 标识节点为独占锁模型

Node node = new Node(Thread.currentThread(), mode); //创建新节点:新节点中线程为当前线程,节点模型为独占锁:thread= thread-1   nextWaiter= mode

// Try the fast path of enq; backup to full enq on failure

Node pred = tail;   // 此时tail=null

if (pred != null) {

node.prev = pred;

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

enq(node);  //进入enq方法

return node;

}

Enq() 方法如下:

private Node enq(final Node node) {   //入参为上一步新建的节点

for (;;) {

Node t = tail;    // 第一次遍历逻辑 tail=null

if (t == null) { // Must initialize

if (compareAndSetHead(new Node()))  //CAS 创建表头 Head

tail = head;

} else {

node.prev = t;  //第二次遍历逻辑:node为上面新建的节点:thread= thread-1   nextWaiter= mode, node.prev指向表头 t 为表头

if (compareAndSetTail(t, node)) {  //CAS设置队列尾节点为当前node

t.next = node;   // 表头后继节点指向当前节点

return t;

}

}

}

}

该场景经过上面的处理之后 CLH队列的数据结构如下:

第一次遍历:

Head,tail节点

第二次遍历:

Head               当前线程node:设置为tail

 
   

接下来分析

acquireQueued这个方法

final boolean acquireQueued(final Node node, int arg) {  //node为当前线程节点 arg=1

boolean failed = true;

try {

boolean interrupted = false; //当前线程在休眠时,有没有被中断过

for (;;) {

final Node p = node.predecessor(); //获取前继节点,这里为head节点

if (p == head && tryAcquire(arg)) { //这里p== head为true,接下来进入

// tryAcquire(arg)这个方法,tryAcquire(arg)方法前面分析过了,这里返回false;

setHead(node);

p.next = null; // help GC

failed = false;

return interrupted;

}

//   接下来会进入以下的逻辑:下面会分析这两个方法

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

这里再次回顾下tryAcquire(arg) 方法:返回fasle

protected final boolean tryAcquire(int acquires) {    // acquires=1

final Thread current = Thread.currentThread(); //当前线程 thread-1

int c = getState();   //state=1

if (c == 0) {

if (!hasQueuedPredecessors() &&

compareAndSetState(0, acquires)) {

setExclusiveOwnerThread(current);

return true;

}

}

else if (current == getExclusiveOwnerThread()) {  // getExclusiveOwnerThread

获取到的线程是tryAcquire(int acquires)中设置的值 这里是 main; current=thread-1

int nextc = c + acquires;

if (nextc < 0)

throw new Error("Maximum lock count exceeded");

setState(nextc);

return true;

}

return false;

}

}

1): acquireQueued(final Node node, int arg) 方法中for循环第一次执行shouldParkAfterFailedAcquire(p, node) 方法分析:源码如下:

入参:pred为前继节点,这里是head  node为当前节点

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

int ws = pred.waitStatus;  // ws=0

if (ws == Node.SIGNAL)

return true;

if (ws > 0) {

do {

node.prev = pred = pred.prev;

} while (pred.waitStatus > 0);

pred.next = node;

} else {

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  //设置前继节点waitStatus= Node.SIGNAL,

}

return false;  //返回false;

}

For循环第二次执行 shouldParkAfterFailedAcquire(p, node)方法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

int ws = pred.waitStatus;  // ws=-1   在第一次已经设置为-1

if (ws == Node.SIGNAL)   //返回true

return true;

if (ws > 0) {

do {

node.prev = pred = pred.prev;

} while (pred.waitStatus > 0);

pred.next = node;

} else {

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  //设置前继节点waitStatus= Node.SIGNAL,

}

return false;  //返回false;

}

返回true后,接下进入parkAndCheckInterrupt()这个方法:

源码分析如下:

private final boolean parkAndCheckInterrupt() {

LockSupport.park(this);  //阻塞当前线程

return Thread.interrupted(); // 返回线程的中断状态

}

LockSupport.park(this); //作用:前继线程节点的状态是 Node.SIGNAL;挂起当前线程;

Thread.interrupted(); //当前被挂起的线程被前继线程中断,返回线程的中断状态;

下面解释一个线程的行为:LockSupport.park(this)  线程被挂起:

当线程被挂起的时候唤醒的方式有两种:

1:unpark 的方式唤醒,前继节点线程使用完锁后,通过unpark方式唤醒当前线程

2:中断唤醒,其他线程通过 interrupt 中断当前线程

接下来继续分析:acquire(int arg)

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

这个时候的重点放在分析 selfInterrupt(); 这个方法上;

进入这个方法的条件是 当前线程被中断过,并且获取锁成功了;

static void selfInterrupt() {

Thread.currentThread().interrupt();  //当前线程产生一个中断,真正被唤醒

}

到此为止,ReentrantLock 的公平锁源码分析结束。

上一篇:Redisson分布式锁学习总结:可重入锁 RedissonLock#lock 获取锁源码分析


下一篇:背包问题lingo求解