AQS

@[toc]

1.学习AQS的思路

  • 了解原理,提高思路
  • 先了解如何使用,应用场景,再去分析它的结构

2.为什么需要AQS

  • 锁的协作类共同点:闸门 (和 ReentrantLock、Semaphore相似)
  • 不仅 ReentrantLock、Semaphore、像CountDownLatch、都有这样的协作 (或者同步)功能,其实,他们底层都用了一个共同的基类,这就是 AQS
  • 因为这些协作类有很多工作都是类似的,所以如果提取出一个工具类,那么就直接就可以用
  • Semaphore 、CountDownLatch 内部有一个类 Sync , Sync类继承了AQS

AQS

3.AQS的作用

如果没有AQS:
: 就需要每个协作工具直接实现
: 线程的阻塞与解除阻塞
: 队列的管理

  • AQS是一个用于构建锁、同步器、协作工具类的工具类(框架)。有了AQS以后,更多的协作工具类都可以很方便得被写出来
  • 有了AQS,构建线程协作类就容易多了

4.AQS的重要性、地位

AbstractQueuedSynchronizer 是 Doug Lea 写的,从JDK1.5加入的一个基于FIFO等待队列实现的一个用于实现同步器的基础框架,我没用IDE看AQS的实现类,可以发现实现类有这些实现

AQS

5.AQS内部原理解析

AQS三大核心:

  • state
  • 控制线程抢锁和配合的FIFO队列
  • 期望协作工具类去实现的获取/释放等重要方法

State状态

  • 这里等state的具体含义,会根据具体实现类的不同而不同,比如在Semaphore 里,它表示 “ 剩余的许可证数量” ,而CountDawnLatch 里,它表示 “ 还需要倒数的数量 ”
  • state 是 volatile修饰的,会被并发地修改,所以所有修改state的方法要保证线程安全,比如getState 、setState 以及 compareAandState 操作来读取和更新这个状态。这些方法都依赖与j.u.c.atomic包的支持
/**
     * The synchronization state.
     * 线程同步状态 利用来volatile 保证可见性
     */
    private volatile int state;

 /**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a {@code volatile} read
     * and write.
     * 
     * 利用了 unsafe 类的 cas操作
     * 
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

像在 ReentrantLock中

  • state用来表示 “锁”的占有情况,包括可重入锁计数
  • 当state 的值为 0的时候,标识改Lock不被任何线程所占有

FIFO队列

这个队列用来存放 “等待的线程”,AQS就是 “排对管理器”,当多个线程争用同一把锁的时候,必须有排队机制讲那些没能拿到锁的线程串在一起。当锁释放时,锁管理器就会挑选一个合适的线程占有这个刚刚释放的锁

获取/释放的方法

这里的获取和释放方法,是利用AQS的协作工具类里最重要的方法,是由协作类自己实现的,并且含义各不相同

获取:

  • 获取操作会依赖state变量,经常会阻塞(比如获取不到锁的时候)
  • 在Semaphore 中 ,获取就是acquire 方法,作用就是获取一个许可证
  • 而在CountDownLatch里,获取就是await方法,作用等待,直到倒数结束

释放:

  • 释放操作不会被阻塞
  • 在Semaphore中 ,释放就是 release 方法,作用是释放一个许可证
  • CountDownLatch 里面,获取就是countDown 方法,作用是 “倒数一个数”

6.应用实例,源码解析

我们看下 CountDownLatch的源码

  • 构造方法
    /**
     * 构造函数 , 传入int值,判断小于0 给Sync 赋值
     */
     public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
/**
* Sync 继承类AQS  Sync的构造获
*/
private static final class Sync extends AbstractQueuedSynchronizer {
   

        Sync(int count) {
            setState(count);
        }

        /**
         *把AQS state值设置 newState
         */
       protected final void setState(int newState) {
        state = newState;
    }


/**
 * 调用如下 获取AQS类的  
 * private volatile int state;
 */

 protected final int getState() {
        return state;
    }
  • getCount() 方法

// 调用AQS 的getState()方法
int getCount() {
            return getState();
 }

//AQS返回state的值
protected final int getState() {
        return state;
}
  • await() 方法
//进行等待 调用AQS    sync.acquireSharedInterruptibly(1);
 public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
            //如果小于0 则放入等待队列中
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //包装成Node节点 
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
        //死循环
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                //中断线程的方法
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
//park 调用的 LockSupport.park(this);
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
  • countDown() 方法
// 减一减一 直到0开始唤醒
 public void countDown() {
        sync.releaseShared(1);
    }


public final boolean releaseShared(int arg) {
        //判断是否等于0 
        if (tryReleaseShared(arg)) {
        // 唤醒所有线程
            doReleaseShared();
            return true;
        }
        return false;
    }
  • 总结

调用CountDownLatch的await方法时,便会尝试获取 "共享锁",不过一开始是获取不到锁的,于是线程阻塞/

而 “共享锁” 可获取到的条件,就是 “锁计数器” 的值为0。

而 “锁计数器初始值为count” 没当一个线程调用 才减一

当count 线程调用countDown()之后 , “锁计数器” 才为 0,而前面提到的等待获取共享个i昂锁的线程才能继续运行

7.AQS优质学习资源

上一篇:记录一次大规模数据库迁移(java)


下一篇:git企业级版本开发,一篇文章足矣