从ReentrantLock看AQS (AbstractQueuedSynchronizer) 运行流程

从ReentrantLock看AQS (AbstractQueuedSynchronizer) 运行流程

概述

本文将以ReentrantLock为例来讲解AbstractQueuedSynchronizer的运行流程,主要通过源码的方式来讲解,仅包含大体的运行流程,不会过于深入。

ReentrantLock 介绍

ReentrantLock 是JDK提供的可重入锁实现类,可用其替换synchronized来实现锁重入效果;其底层实现主要是依靠AbstractQueuedSynchronizer,本文将通过ReentrantLock来观察AbstractQueuedSynchronizer的运行流程。

AbstractQueuedSynchronizer

介绍

关于AbstractQueuedSynchronizer(以下简称AQS),JDK是这样子描述的:

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.

大体意思就是“提供一个框架,用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量、事件等)”,AQS不是一个功能完整的类,而是一个提供了一套依赖于FIFO等待队列的流程框架,该框架可用于实现锁等同步器的。AQS中没有使用任何锁相关的API,其实现主要依靠CAS (Compare And Swap),是一个优秀的lock-free 编程实践。

AQS数据结构

AQS中主要包含以下三个字段

/**
 * Head of the wait queue, lazily initialized.  Except for
 * initialization, it is modified only via method setHead.  Note:
 * If head exists, its waitStatus is guaranteed not to be
 * CANCELLED.
 */
private transient volatile Node head;

/**
 * Tail of the wait queue, lazily initialized.  Modified only via
 * method enq to add new wait node.
 */
private transient volatile Node tail;

/**
 * The synchronization state.
 */
private volatile int state;

这三个字段都标记了volatile,其中headtail主要用于维护AQS的FIFO双向队列,该队列是AQS的核心,只由AQS该维护该队列,子类实现不会去维护该队列;state用于标记当前同步器的转态,AQS不会对该字段做任何操作,该字段由子类去维护,但AQS提供了修改state的方法,其中的compareAndSetState是子类用的最多的,主要用于实现多线程对同步器的抢夺。

AQS 主要方法

在开始了解AQS的运行流程之前,我们先看一下在使用AQS时需要关注的四个方法:


// 抢夺资源流程的入口,AQS暴露出的API,由自定义同步器来调用,ReentrantLock 的lock方法就是去调用该方法。
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

// 由子类实现该方法,抢占资源逻辑在这个方法实现,该方法由AQS在抢夺资源流程中调用。
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

// 释放资源流程的入口,AQS暴露出的API,由自定义同步器来调用,ReentrantLock 的unlock方法就是去调用该方法。
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// 由子类实现该方法,释放资源逻辑在这个方法实现,该方法由AQS在释放资源流程中调用。
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

其中acquiretryAcquire 跟同步器资源抢夺相关;releasetryRelease 和同步器资源释放相关。acquirerelease 方法是AQS流程的入口,通过这两个方法来走资源抢夺和资源释放的流程,该流程中包含了FIFO队列维护、线程状态管理等操作,是整个AQS的核心,自定义同步器中会去调用这两个方法;而tryAcquiretryRelease 方法对应线程资源抢占和释放操作,这两个方法中只关心线程是否抢占/释放资源成功,不会维护FIFO队列和线程状态,由子类来实现这两个方法,这个是自定义同步器的核心代码,由这两个方法来实现不同同步器的不同功能。

这四个方法对应的是线程独占流程,共享流程使用的是acquireShared, tryAcquireShared, releaseShared, tryReleaseShared 这四个方法,ReentrantLock是线程独占模式,所以本文主要讲解线程独占流程,但线程共享流程和独占流程差别不大,感兴趣的同学可以自行了解。

下面看一下ReentrantLock中是如何去使用这四个方法的:

// 简化版代码,方便演示
public class ReentrantLock implements Lock, Serializable {
    private final Sync sync = new Sync();
    
    public void lock() {
        this.sync.acquire(1);
    }
    
    public void unlock() {
        this.sync.release(1);
    }

    static final class Sync extends AbstractQueuedSynchronizer {

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) { // 当前锁没有被占用
                if (compareAndSetState(0, acquires)) { // 尝试去拿锁
                    setExclusiveOwnerThread(current); // 标记当前线程为锁的持有者
                    return true; // 返回拿锁成功
                }
            }
            else if (current == getExclusiveOwnerThread()) { // 当前锁被占用,且是被自己占用,走重入逻辑,对state做累加
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            // 锁被占用或者拿锁失败,返回拿锁失败
            return false;
        }

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) { // 锁被完全释放
                free = true; // 标记锁已被释放
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

    }

可以看出,ReentrantLock的核心功能是由内部类Sync来完成的,而这个Sync类就是继承了AQS,并重写了tryAcquiretryRelease 方法,这两个方法的实现也很简单

  • tryAcquire 中主要是通过state字段是否等于0来判断当前锁是否锁住了,没有锁住,则当前线程使用CAS去尝试将state标记为acquires,标记成功则代表拿锁成功,返回true,否则返回false
  • tryRelease 中则是通过判断释放后state是否为0来判断当前锁是否被完全释放,若完全释放则返回true,否则返回false

可以看出这个两个方法的实现只关注锁的占用和释放是否成功,没有关心FIFO队列和线程状态。那么FIFO队列和线程状态是如何来维护的呢?答案就是在acquirerelease 方法中;从ReentrantLock源码也可以看到,lockunlock方法只是调用了一下acquirerelease 方法,所以接下来我们重点来看一下这两个方法的实现

Node

开始前,先看一下队列Node的数据结构

static final class Node {

        static final int CANCELLED =  1;

        static final int SIGNAL    = -1;

        static final int CONDITION = -2;

        static final int PROPAGATE = -3;
      
        volatile int waitStatus;

        volatile Node prev;
 
        volatile Node next;

        volatile Thread thread;
}

其中waitStatus字段代表了节点的等待状态,共包含了5个值。

  1. INIT(0): 节点的默认状态为0。当线程释放资源后,也会将自己的节点状态设置为0。
  2. CANCELLED(1): 当前线程节点已取消等待,为CANCELLED的节点会在acquire方法里的流程中被清除。
  3. SIGNAL(-1): 当前节点的后续节点已沉睡,需要被唤醒。会在release方法里的流程中将其后续节点唤醒。
  4. CONDITION(-2)PROPAGATE(-3) 则和条件锁及共享模式有关,本文不过多解释。

其中可以看出,状态可以分为两类,无效转态和有效状态,大于0则代表当前节点无效了,需要被移除,小于等于0则是有效节点,需要继续去尝试获取资源。

acquire

先看一下acquire方法的实现

public final void acquire(int arg) {
        // 调用我们自己实现的tryAcquire去获取资源,获取失败则尝试将自己加入到等待队列中
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
 }

再看一下addWaiter方法

private Node addWaiter(Node mode) {
        Node node = new Node(mode);
        // 为当前线程新建一个node,并将其加入到队列尾部,这里需要注意的是添加时是先将新节点的prev设置为老的尾部节点,使用CAS将新节点设置为tail后才会将老节点的next设置为新节点,
        // 这样做的理由是为了防止并发问题,后续的对队列的修改也是包括两种遍历,一个是从前往后,一个是从后往前。
        for (;;) {
            Node oldTail = tail;
            if (oldTail != null) {
                node.setPrevRelaxed(oldTail);
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return node;
                }
            } else {
                initializeSyncQueue();
            }
        }
    }

添加完节点后,就会尝试去从队列中获取资源,这里也是AQS的核心了,看一下acquireQueued方法

final boolean acquireQueued(final Node node, int arg) {
        boolean interrupted = false;
        try {
            for (;;) {
                // 取到当前节点的上一个节点
                final Node p = node.predecessor();
                // 如果上一个节点是头节点,则证明前面没有在等待的线程了,轮到当前线程去尝试获取资源。
                if (p == head && tryAcquire(arg)) {
                    // 获取资源成功,则将当前节点设为头节点,并设置为空节点
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                // 判断前面一个节点是否是一个有效的等待线程节点(没有取消等待的线程),是则将自己睡眠,不是则将前面已取消的线程节点移除,直到找到一个有效的线程节点作为自己的前序节点并将其状态设为SIGNAL,然后再走一次循环。
                if (shouldParkAfterFailedAcquire(p, node))
                    interrupted |= parkAndCheckInterrupt();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            if (interrupted)
                selfInterrupt();
            throw t;
        }
    }

总结一下acquire方法的流程:

  1. 调用子类实现的tryAcquire方法去获取资源。
  2. 获取成功则当前线程继续执行,否则将自己加入到等待队列。
  3. 进入等待队列后,循环的去判断自己的上一个节点是否是头结点,是则再次调用子类实现的tryAcquire方法去获取资源,否则进入沉睡,等待唤醒。
  4. 在沉睡前,会将前面已取消的线程节点移除,直到找到一个有效的线程节点做为自己的前序节点,并将其状态设为SIGNAL。

一句话就是acquire方法会去尝试获取资源,获取失败则将自己加入到等待队列,并沉睡,等待唤醒。

举个例子,demo1线程获取资源失败后,当前的队列状态为:
从ReentrantLock看AQS (AbstractQueuedSynchronizer) 运行流程
若是有多个线程等待,则队列状态为:
从ReentrantLock看AQS (AbstractQueuedSynchronizer) 运行流程

由图可以看出,若当前节点已沉睡,则需要在沉睡前将前一个节点的转态设为SIGNAL

release

前面的acquire方法说到,等待队列中的线程会进入沉睡,等待唤醒,那么由谁来唤醒呢?答案就是release方法,也就是我们在调用unlock的时候,接下来我们先看一下release方法的实现

public final boolean release(int arg) {
        // 调用子类实现的tryRelease方法去释放资源
        if (tryRelease(arg)) {
            Node h = head;
            // 释放成功,且等待队列中还有节点,则去唤醒下一个等待线程
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

再看一下unparkSuccessor方法

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            node.compareAndSetWaitStatus(ws, 0);
        Node s = node.next;
        // 如果下一个节点已经取消了,则从后往前遍历,找到第一个等待线程节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        // 找到了等待线程节点,并将其唤醒
        if (s != null)
            LockSupport.unpark(s.thread);
    }

唤醒后线程将会继续在acquireQueued方法里执行,并尝试再去获取资源。
总结一下release方法的流程:

  1. 调用子类实现的tryRelease方法去释放资源
  2. 释放成功则判断当前队列是否有等待线程
  3. 有则找到第一个等待线程,并将其唤醒。

从流程上也可以看出,若tryRelease方法中抛了异常,则会导致所有沉睡的线程将无法被唤醒。

AQS总结

AQS是模版设计模式的一种实践,其将线程调度等通用逻辑进行了实现,只预留了tryAcquiretryRelease 方法供子类实现自己的并发工具,大大的减轻了实现的复杂难度。

总结

ReentrantLock 是使用AQS实现的,其主要是实现了AQS的tryAcquiretryRelease 方法,且只需要在lockunlock方法中调用一下acquirerelease 方法即可,可以看出AQS非常强大!

参考文章

从ReentrantLock看AQS (AbstractQueuedSynchronizer) 运行流程

上一篇:用window.URL全局对象


下一篇:前端工程化最佳实践