【源码解析】ReentrantLock的重入锁分析AQS

本文以公平锁的角度切入AQS

ReentrantLock

Synchronized ReentrantLock
锁实现机制 对象头监视器模式 依赖 AQS
灵活性 不灵活 支持响应中断、超时、尝试获取锁
释放锁形式 自动释放锁 显示调用 unlock()
支持锁类型 非公平锁 公平锁 & 非公平锁
条件队列 单条件队列 多个条件队列
是否支持可重入 支持 支持

写在前面:

分清楚:锁是对外界开放的API,而AQS同步器是对线程进行管理的容器,任务区分开来,研究的是AQS。

(Lock:lock、unlock;AQS:acquire、acquireQueue……)。

抽象同步队列最终的实现是:公平锁或者非公平锁

加锁过程:
【源码解析】ReentrantLock的重入锁分析AQS

加锁:

  • 通过 ReentrantLock 的加锁方法 Lock 进行加锁操作。
  • 会调用到内部类 Sync 的 Lock 方法,由于 Sync#lock 是抽象方法,根据 ReentrantLock 初始化选择的公平锁和非公平锁,执行相关内部类的 Lock 方法,本质上都会执行 AQS 的 Acquire 方法。
  • AQS 的 Acquire 方法会执行 tryAcquire 方法,但是由于 tryAcquire 需要自定义同步器实现,因此执行了 ReentrantLock 中的 tryAcquire 方法,由于 ReentrantLock 是通过公平锁和非公平锁内部类实现的 tryAcquire 方法,因此会根据锁类型不同,执行不同的 tryAcquire。
  • tryAcquire 是获取锁逻辑,获取失败后,会执行框架 AQS 的后续逻辑,跟 ReentrantLock 自定义同步器无关。

解锁:

  • 通过 ReentrantLock 的解锁方法 Unlock 进行解锁。
  • Unlock 会调用内部类 Sync 的 Release 方法,该方法继承于 AQS。
  • Release 中会调用 tryRelease 方法,tryRelease 需要自定义同步器实现,tryRelease 只在 ReentrantLock 中的 Sync 实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。
  • 释放成功后,所有处理由 AQS 框架完成,与自定义同步器无关。

state

state为锁的状态:0~n
state:volatile 、CAS
    确保该变量对其他线程也是可见的。
    无锁机制改变state的值
    
1)当state=0时,表示无锁状态
2)当state>0时,表示已经有线程获得了锁,也就是 state=1,但是因为 ReentrantLock 允许重入,所以同一个线程多次获得同步锁的时候,state 会递增,比如重入5次,那么state=5。 而在释放锁的时候,同样需要释放 5 次直到 state=0 其他线程才有资格获得锁

AOS:指定当前的独占锁的线程。

AQS中的队列

特点:

1、先进先出的双端队列

2、通过 Head、Tail 头尾两个节点来组成队列结构,通过 volatile 修饰保证可见性

3、Head 指向节点为已获得锁的节点,head指向的时得到锁的Node节点,是一个虚拟节点,节点本身不持有具体线程;更可以看作是 【有锁节点 + 阻塞队列】

4、获取不到同步状态,会将节点进行自旋获取锁,自旋一定次数失败后会将线程阻塞,相对于 CLH 队列性能较好【抢锁方式:自适应自旋锁】

节点的状态

关键方法与属性 对应含义
waitStatus 当前节点在队列中处于什么状态
thread 表示节点对应的线程
prev 前驱指针,指向本节点的上一个节点
next 后继指针,指向本节点的下一个节点
predecessor 返回前驱节点,没有的话抛出 NPE 异常
属性值 值含义
0 Node 被初始化后的默认值
CANCELLED 值为1,由于中断或超时,节点被取消
SIGNAL 值为-1,表示节点的后继节点即将被阻塞;当前节点需要唤醒他的后继节点
CONDITION 值为-2,表示节点在等待队列中,节点线程等待唤醒

获取锁_核心方法:

  • acquire:自旋获取锁的方式
  • tryAcquire:会尝试再次通过 CAS 获取一次锁。
  • addWaiter:① 将当前线程封装为Node节点,② 加入上面锁的双向链表(等待队列)中(enq)
    1. 如果为尾节点不为空,需要将新节点添加到 oldTail 的 next 节点,同时将新节点的 prev 节点指向 oldTail;
    2. 如果当前队列为空,需要进行初始化,此时 head 结点和 tail 节点都是 h = new Node () 实例;此时 oldTail = h 不为空,node 的 prev 为 oldTail, oldTail 的 next 是 node。(必成功,因为自旋)
  • acquireQueued:通过自旋,判断当前队列前置节点是否可以获取锁
    1. 当前节点就是队列第一个(head.next),有抢锁的资格tryAcquire一次,返回interrupted = false,退出自旋;
    2. 如果 node 是第一个加入等待队列的,此时 node 的 prev 节点是 head ( new Node() ),node 会先去获取锁,失败后,因为 prev 的 waitStatus = 0,这时候将其 waitStatus 设置为 -1,然后再次循环,再获取锁失败就会调用 parkAndCheckInterrupt 阻塞当前线程;
    3. shouldParkAfterFailedAcquire 过程中会将队列中处于 CANCELLED = 1 的节点删除。也就是说每添加一个节点,获取锁失败后,都可能会对队列做一遍整理;判断当前节点是否需要park

tryAcquire

尝试拿锁,只有两种情况能够成功:

1)无锁

2)自己人(涉及到锁的重入)

protected final boolean tryAcquire(int acquires) {
    // 判断锁状态 和 线程能否重入
    final Thread current = Thread.currentThread();
    int c = getState();
    
    if (c == 0) {
        //因为fairSync是公平锁,任何时候都需要检查一下 队列中是否在当前线程之前有等待者..
        if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }

    return false;
}

addWaiter

addWaiter:① 将当前线程封装为Node节点,② 加入上面锁的双向链表(等待队列)中(enq)

  1. 如果为尾节点不为空,需要将新节点添加到 oldTail 的 next 节点,同时将新节点的 prev 节点指向 oldTail;
  2. 如果当前队列为空,需要进行初始化,此时 head 结点和 tail 节点都是 h = new Node () 实例;此时 oldTail = h 不为空,node 的 prev 为 oldTail, oldTail 的 next 是 node。(必成功,因为自旋)

acquireQueued

获取锁(tryAcquire)->构造节点(addWaiter)->加入队列(addWaiter)->自旋获取锁(acquireQueued)

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    //当前线程是否被中断 = 标记等待过程中是否中断过
    boolean interrupted = false;
    try {
        //自旋..前置节点..Node 要么获取锁,要么中断
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;  //当前线程 获取锁 过程中..没有异常
                return interrupted;  // 这里才是出口,唯一的出口
            }

            // 挂起当前线程,并且唤醒之后 返回 当前线程的 中断标记
            // 唤醒:1.正常唤醒 其它线程 unpark 2.其它线程给当前挂起的线程 一个中断信号..
            if (shouldParkAfterFailedAcquire(p, node) &&  parkAndCheckInterrupt())
                //interrupted == true 表示当前node对应的线程是被 【中断信号唤醒的...】
                interrupted = true;
        }
    } finally {
        //true 表示当前线程抢占锁成功,普通情况下【lock】 当前线程早晚会拿到锁..
   	    //false 表示失败,需要执行出队的逻辑... (响应中断的lock方法时。)
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire

1、pred(waitState)为0,当前的结点不需要阻塞。;

2、pred(waitState)为1,跳过;

3、pred(waitState)为-1,park当前节点。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    
    // 1、前方节点是处于阻塞,挂起当前线程(申锁节点进行阻塞)
    if (ws == Node.SIGNAL)  return true;
    
    // 2、队列包含了被解约的节点
    if (ws > 0) {
        do {
            // 跳过所有的取消的Node == 将所有的CANCELLED节点出队
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
        
    // 3、初始化的Node,默认状态为0。
    } else {
        // 将前置节点强制设置为 SIGNAl,表示前置节点释放锁之后需要 喊醒我..
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

取消任务

public void unlock() {
    sync.release(1);
}

这里主要就是对资源的释放,更新同步状态,然后唤醒下一个等待线程。

release

//AQS#release方法
//ReentrantLock.unlock() -> sync.release()【AQS提供的release】
public final boolean release(int arg) {
    //尝试释放锁,tryRelease 返回true 表示当前线程已经完全释放锁
    //返回false,说明当前线程尚未完全释放锁..
    if (tryRelease(arg)) {

        //head什么情况下会被创建出来?
        //当持锁线程未释放线程时,且持锁期间 有其它线程想要获取锁时,其它线程发现获取不了锁,而且队列是空队列,此时后续线程会为当前持锁中的
        //线程 构建出来一个head节点,然后后续线程  会追加到 head 节点后面。
        Node h = head;

        //条件一:成立,说明队列中的head节点已经初始化过了,ReentrantLock 在使用期间 发生过 多线程竞争了...
        //条件二:条件成立,说明当前head后面一定插入过node节点。
        if (h != null && h.waitStatus != 0)
            //唤醒后继节点..
            unparkSuccessor(h);
        return true;
    }

    return false;
}

tryRelease

//Sync#tryRelease()
protected final boolean tryRelease(int releases) {
    //减去释放的值..
    int c = getState() - releases;
    //条件成立:说明当前线程并未持锁..直接异常.,.
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();

    //当前线程持有锁..


    //是否已经完全释放锁..默认false
    boolean free = false;
    //条件成立:说明当前线程已经达到完全释放锁的条件。 c == 0
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    //更新AQS.state值
    setState(c);
    return free;
}

unparkSuccessor

1)如果其下一个节点为空,或者其等待状态是取消状态,那么就从后往前找,找到一个等待状态 <=0 的,然后将其唤醒;
2)如果下一个节点不为空,且等待状态 <=0,将其唤醒。

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0) compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;

    
    //条件一:
    //s 什么时候等于null?
    //1.当前节点就是tail节点时  s == null。
    //2.当新节点入队未完成时(1.设置新节点的prev 指向pred  2.cas设置新节点为tail   3.(未完成)pred.next -> 新节点 )
    //需要找到可以被唤醒的节点..

    //条件二:s.waitStatus > 0    前提:s != null
    //成立:说明 当前node节点的后继节点是 取消状态... 需要找一个合适的可以被唤醒的节点..
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }


    //如果找到合适的可以被唤醒的node,则唤醒.. 找不到 啥也不做。
    if (s != null)
        LockSupport.unpark(s.thread);
}

cancelAcquire

/**
 * 当前取消排队的node所在 队列的位置不同,执行的出队策略是不一样的,一共分为三种情况:
 * 1.当前node是队尾  tail -> node
 * 2.当前node 不是 head.next 节点,也不是 tail
 * 3.当前node 是 head.next节点。
 */
private void cancelAcquire(Node node) {

    if (node == null) return;
    node.thread = null;
    Node pred = node.prev;
    //滤除所有的无效节点pred.waitStatus > 0 即:1 cancelled
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    Node predNext = pred.next;
    node.waitStatus = Node.CANCELLED;
    
    //条件一:node == tail  成立:当前node是队尾  tail -> node
    //条件二:compareAndSetTail(node, pred) 成功的话,说明修改tail完成。
    if (node == tail && compareAndSetTail(node, pred)) {
        //修改pred.next -> null. 完成node出队。
        compareAndSetNext(pred, predNext, null);

    } else {
        int ws;

        //条件一:pred != head 成立, 说明当前node 不是 head.next 节点,也不是 tail【中间的节点】
        if (pred != head &&
            //条件2.1:成立:说明node的前驱状态是 Signal 状态   不成立:前驱状态可能是0,
            ((ws = pred.waitStatus) == Node.SIGNAL ||
                // 假设前驱状态是 <= 0 则设置前驱状态为 Signal状态..表示要唤醒后继节点。
                (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {
            Node next = node.next;
            //让pred.next -> node.next  ,所以需要保证pred节点状态为 Signal状态。
            if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next);

        } else {
            //head.next节点,直接unpark后继(相当于忽略现在的节点)
            unparkSuccessor(node);
        }
        node.next = node; // help GC,修改了 node 的next 指针,但是保证了 prev 指针的不变
    }
}

为什么所有的变化都是对 Next 指针进行了操作,而没有对 Prev 指针进行操作呢?

执行 cancelAcquire 的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过 Try 代码块中的 shouldParkAfterFailedAcquire 方法了),如果此时修改 Prev指针,有可能会导致 Prev 指向另一个已经移除队列的 Node,因此这块变化 Prev 指针不安全。

什么情况下会对 Prev 指针进行操作?
shouldParkAfterFailedAcquire 方法中,会执行下面的代码,其实就是在处理 Prev 指针。shouldParkAfterFailedAcquire 是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更 Prev 指针比较安全。

do {
    node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);

参考资料:
《Java并发编程实战》
《小刘源码课程》
https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

上一篇:学习Pytorch+Python之MNIST手写字体识别


下一篇:ReentrantLock源码