【架构师面试-JUC并发编程-8】-AQS

1:为什么需要AQS

锁和协作类(信号量)有共同点:类似一个闸门(只允许部分线程通过),因为它们底层都用一个共同的基类AQS

因为上面的那些协作类,它们有很多工作类似,所以可以提取出一个工具类,就可以直接用,对于ReentrantLock和Semaphore而言就可以屏蔽很多细节,只关注它们自己的业务逻辑就可以。

AQS是一个用于构建锁、同步器、协作工具类的工具类(框架),有了AQS以后,更多的协作工具类都可以很方便的写出来。

2:Semaphore和AQS关系

  Semaphore内部有一个Sync类,Sync类继承了AQS

    类似面试中的群面和单面

安排就坐、叫号、先来后到等HR的工作就是AQS所做的,面试官(并发类)不关心两个面试者是不是号码会冲突,也不用管面试者需要一个地方坐关等待,这些都交给HR(AQS去做)

Semaphore:一个人面试完后,后一个人才能进来继续面试(许可证)

CountDownLatch:群面,需要等待10个人到齐一起面试。

Semaphore、CountDownLatch这些同步工具类,要做的就是写下自己”要人”的规则,比如”出一个,再进一个”或”凑齐10人一起面试”。

剩下的招呼面试者的活都由AQS来做。

3:AQS原理

1:state:会根据具体实现类的不同而不同

比如在Semaphore里,它表示”剩余的许可证数量”

在CountDownLatch里,它表示”还需要倒数的数量”。

在ReentrantLock中,state表示锁的占有情况,包括可重入计数。

state是volatile修饰,会被并发地修改,所以所有修改state的方法都需要保证线程安全,比如getState、setState、compareAndSetState操作来读取和更新这个状态,这些方法都依赖于juc.atomic包支持。

2:控制线程抢锁和配合的FIFO队列

这个队列用来存放”等待的线程”,AQS就是”排队管理器”,当多个线程争用同一把锁时,必须有排队机制将那些没能拿到锁的线程串在一起。当锁释放时,锁管理器就会挑选一个合适的线程来占有这个刚释放的锁。

AQS会维护一个等待的线程队列,把线程都放到这个双向链表队列中。

【架构师面试-JUC并发编程-8】-AQS

 

head是拿到锁的线程,后面是等待拿锁的线程。

3:期望协作工具类去实现的获取/释放方法

这里获取和释放方法是利用AQS协作工具类里最重要的方法,是由协作类自己去实现,并且含义各不相同。

获取操作依赖state变量,经常会阻塞(获取不到锁时),如在Semaphore中,获取就是acquire方法,作用是获取一个许可证,如可state变量>0,则可以获取,获取到后值会减1

释放操作不会阻塞,如在Semaphore中,释放就是release方法,作用是释放一个许可证,使state变量加1

4:AQS用法

第一步:写一个类,想好协作逻辑,实现获取和释放方法

第二步:内部写一个Sync类继承AbstractQueuedSynchronized

第三步:根据是否独占或共享来重写tryAcquire/tryRelease或tryAcquireShared(int acquires)和tryReleaseShared(int release)等方法,在之前写的获取/释放方法中调用AQS的acquire/release或Shared方法。

5:利用AQS实现自己的门闩

aqs/OneShotLatch.java

package aqs;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
 * 描述:自己用AQS实现一个简单的线程协作器
 */
public class OneShotLatch {
    private final Sync sync = new Sync();
 
    //获取锁,利用AQS提供的
    public void await() {
        sync.acquireShared(0);
    }
    //释放锁
    public void signal() {
        sync.releaseShared(0);
    }
 
    private class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected int tryAcquireShared(int arg) {
            //state=1放行
            return (getState() == 1) ? 1 : -1;
        }
 
        @Override
        protected boolean tryReleaseShared(int arg) {
           setState(1);
           //唤醒所有其它等待的线程
           return true;
        }
    }
 
    public static void main(String[] args) throws InterruptedException {
        OneShotLatch oneShotLatch = new OneShotLatch();
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
     System.out.println(Thread.currentThread().getName()+"尝试获取latch,获取失败那就等待");
                    oneShotLatch.await();
                 System.out.println("开闸放行"+Thread.currentThread().getName()+"继续运行");
                }
            }).start();
        }
        //5秒后放闸
        Thread.sleep(5000);
        //唤醒等待线程。
        oneShotLatch.signal();
    }
}

上一篇:【JDK源码】同步系列AQS之条件锁


下一篇:达梦数据库-归档不完整问题浅谈