ReentrantLock 源码分析 以公平锁源码解析为例:
1:数据结构:
维护Sync 对象的引用: private final Sync sync;
Sync对象继承 AQS, Sync 分为两个类:处理公平锁锁和非公平锁:
FairSync NonfairSync
具体的类图如下:
2:接下来重点分析AQS这个类:AbstractQueuedSynchronizer:
AQS中的成员变量:
private transient volatile Node head; //AQS维护队列的头结点
private transient volatile Node tail; // AQS维护队列的尾结点
private volatile int state; // AQS 锁的状态 数量标识锁被获取的次数
下面看看NODE 结点的成员变量:
volatile int waitStatus; //等待状态
volatile Node prev; //前继节点
volatile Node next; //后继节点
volatile Thread thread; //线程对象
Node nextWaiter; //下一个等待节点
从NODE的数据结构可以看出来,AQS里面维护的队列的数据结构是双链表的形式;
3:接下来分析 ReentrantLock 的构造方法:
ReentrantLock lock = new ReentrantLock(true); //传入true,说明是构造公平锁
具体的构造方法如下,返回FairSync 对象:
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
4:lock方法的分析:因为是公平锁,所以调用 FairSync 下的lock方法:
final void lock() {
acquire(1);
}
Acquire 的方法如下:
public final void acquire(int arg) { // arg=1
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
接下来分两种场景来分析tryAcquire(arg) 1:同一线程第一次或N次获取锁(其他线程没有获取到锁) 2:其他线程已经获取到锁,当前线程尝试去获取锁;
//场景 1:
protected final boolean tryAcquire(int acquires) { // acquires=1
final Thread current = Thread.currentThread(); //当前线程:main-thread
int c = getState(); //如果为第一次获取锁c=0 如果main线程已经获取过锁,则c为加锁的次数
if (c == 0) { //当前线程第一次获取锁
if (!hasQueuedPredecessors() && // hasQueuedPredecessor的分析如下单独分析:
compareAndSetState(0, acquires)) { //cas原子操作 state=1
setExclusiveOwnerThread(current); //独占线程设置为当前线程
return true; //返回true表明加锁成功
}
}
else if (current == getExclusiveOwnerThread()) { // c=n 的情况下
int nextc = c + acquires; // nextc=n+1
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc); //设置 state=n+1
return true; //获取到锁,返回true
}
return false;
}
说明:hasQueuedPredecessors主要是判断当前线程所在的节点是不是CLH队列的首个位置,这个判断的目的是公平锁的公平获取锁的机制
hasQueuedPredecessors的源码如下:以该线程是第一次获取锁为例分析:
public final boolean hasQueuedPredecessors() {
Node t = tail; // tail=null
Node h = head; // head=null
Node s;
return h != t && //返回false
((s = h.next) == null || s.thread != Thread.currentThread());
}
//场景2 分析:有其他线程未释放锁(main-thread 持有锁,thread-1尝试去获取锁)
protected final boolean tryAcquire(int acquires) { // acquires=1
final Thread current = Thread.currentThread(); //当前线程:thread-1
int c = getState(); // 由于其他持有锁 state 至少为1
if (c == 0) {
if(!hasQueuedPredecessors()&& compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // current=thread1 // getExclusiveOwnerThread()=main
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; //此时返回false表明尝试获取锁失败
}
5:上面的场景1 获取到锁后返回true,则lock 方法执行结束。下面分析场景2:
public final void acquire(int arg) {
if (!tryAcquire(arg) && //尝试获取锁失败,返回false
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
!tryAcquire(arg) 返回为true;接下来进入
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 这个逻辑;
这个方法有两层处理:
1:addWaiter(Node.EXCLUSIVE) 将当前线程调价到CLH队列中
2:acquireQueued();逐步执行CLH队列中的线程,如果当前线程获取到锁则返回,否则,当前线程进行休眠,直到唤醒并重新获取到锁才返回;
以下分析这两个方法:场景:main线程获取到锁但未释放,这是线程 thread-1去获取锁:(假设此时没有其他线程在CLH队列中,即CLH队列为null)
addWaiter(Node.EXCLUSIVE)方法:入参 Node.EXCLUSIVE 为null;
private Node addWaiter(Node mode) { //mode=null EXCLUSIVE 标识节点为独占锁模型
Node node = new Node(Thread.currentThread(), mode); //创建新节点:新节点中线程为当前线程,节点模型为独占锁:thread= thread-1 nextWaiter= mode
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; // 此时tail=null
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); //进入enq方法
return node;
}
Enq() 方法如下:
private Node enq(final Node node) { //入参为上一步新建的节点
for (;;) {
Node t = tail; // 第一次遍历逻辑 tail=null
if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) //CAS 创建表头 Head
tail = head;
} else {
node.prev = t; //第二次遍历逻辑:node为上面新建的节点:thread= thread-1 nextWaiter= mode, node.prev指向表头 t 为表头
if (compareAndSetTail(t, node)) { //CAS设置队列尾节点为当前node
t.next = node; // 表头后继节点指向当前节点
return t;
}
}
}
}
该场景经过上面的处理之后 CLH队列的数据结构如下:
第一次遍历:
Head,tail节点
第二次遍历:
Head 当前线程node:设置为tail
接下来分析
acquireQueued这个方法
final boolean acquireQueued(final Node node, int arg) { //node为当前线程节点 arg=1
boolean failed = true;
try {
boolean interrupted = false; //当前线程在休眠时,有没有被中断过
for (;;) {
final Node p = node.predecessor(); //获取前继节点,这里为head节点
if (p == head && tryAcquire(arg)) { //这里p== head为true,接下来进入
// tryAcquire(arg)这个方法,tryAcquire(arg)方法前面分析过了,这里返回false;
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 接下来会进入以下的逻辑:下面会分析这两个方法
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这里再次回顾下tryAcquire(arg) 方法:返回fasle
protected final boolean tryAcquire(int acquires) { // acquires=1
final Thread current = Thread.currentThread(); //当前线程 thread-1
int c = getState(); //state=1
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // getExclusiveOwnerThread
获取到的线程是tryAcquire(int acquires)中设置的值 这里是 main; current=thread-1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
1): acquireQueued(final Node node, int arg) 方法中for循环第一次执行shouldParkAfterFailedAcquire(p, node) 方法分析:源码如下:
入参:pred为前继节点,这里是head node为当前节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // ws=0
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //设置前继节点waitStatus= Node.SIGNAL,
}
return false; //返回false;
}
For循环第二次执行 shouldParkAfterFailedAcquire(p, node)方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // ws=-1 在第一次已经设置为-1
if (ws == Node.SIGNAL) //返回true
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //设置前继节点waitStatus= Node.SIGNAL,
}
return false; //返回false;
}
返回true后,接下进入parkAndCheckInterrupt()这个方法:
源码分析如下:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //阻塞当前线程
return Thread.interrupted(); // 返回线程的中断状态
}
LockSupport.park(this); //作用:前继线程节点的状态是 Node.SIGNAL;挂起当前线程;
Thread.interrupted(); //当前被挂起的线程被前继线程中断,返回线程的中断状态;
下面解释一个线程的行为:LockSupport.park(this) 线程被挂起:
当线程被挂起的时候唤醒的方式有两种:
1:unpark 的方式唤醒,前继节点线程使用完锁后,通过unpark方式唤醒当前线程
2:中断唤醒,其他线程通过 interrupt 中断当前线程
接下来继续分析:acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这个时候的重点放在分析 selfInterrupt(); 这个方法上;
进入这个方法的条件是 当前线程被中断过,并且获取锁成功了;
static void selfInterrupt() {
Thread.currentThread().interrupt(); //当前线程产生一个中断,真正被唤醒
}
到此为止,ReentrantLock 的公平锁源码分析结束。