1:为什么需要AQS
锁和协作类(信号量)有共同点:类似一个闸门(只允许部分线程通过),因为它们底层都用一个共同的基类AQS
因为上面的那些协作类,它们有很多工作类似,所以可以提取出一个工具类,就可以直接用,对于ReentrantLock和Semaphore而言就可以屏蔽很多细节,只关注它们自己的业务逻辑就可以。
AQS是一个用于构建锁、同步器、协作工具类的工具类(框架),有了AQS以后,更多的协作工具类都可以很方便的写出来。
2:Semaphore和AQS关系
Semaphore内部有一个Sync类,Sync类继承了AQS
类似面试中的群面和单面
安排就坐、叫号、先来后到等HR的工作就是AQS所做的,面试官(并发类)不关心两个面试者是不是号码会冲突,也不用管面试者需要一个地方坐关等待,这些都交给HR(AQS去做)
Semaphore:一个人面试完后,后一个人才能进来继续面试(许可证)
CountDownLatch:群面,需要等待10个人到齐一起面试。
Semaphore、CountDownLatch这些同步工具类,要做的就是写下自己”要人”的规则,比如”出一个,再进一个”或”凑齐10人一起面试”。
剩下的招呼面试者的活都由AQS来做。
3:AQS原理
1:state:会根据具体实现类的不同而不同
比如在Semaphore里,它表示”剩余的许可证数量”
在CountDownLatch里,它表示”还需要倒数的数量”。
在ReentrantLock中,state表示锁的占有情况,包括可重入计数。
state是volatile修饰,会被并发地修改,所以所有修改state的方法都需要保证线程安全,比如getState、setState、compareAndSetState操作来读取和更新这个状态,这些方法都依赖于juc.atomic包支持。
2:控制线程抢锁和配合的FIFO队列
这个队列用来存放”等待的线程”,AQS就是”排队管理器”,当多个线程争用同一把锁时,必须有排队机制将那些没能拿到锁的线程串在一起。当锁释放时,锁管理器就会挑选一个合适的线程来占有这个刚释放的锁。
AQS会维护一个等待的线程队列,把线程都放到这个双向链表队列中。
head是拿到锁的线程,后面是等待拿锁的线程。
3:期望协作工具类去实现的获取/释放方法
这里获取和释放方法是利用AQS协作工具类里最重要的方法,是由协作类自己去实现,并且含义各不相同。
获取操作依赖state变量,经常会阻塞(获取不到锁时),如在Semaphore中,获取就是acquire方法,作用是获取一个许可证,如可state变量>0,则可以获取,获取到后值会减1
释放操作不会阻塞,如在Semaphore中,释放就是release方法,作用是释放一个许可证,使state变量加1
4:AQS用法
第一步:写一个类,想好协作逻辑,实现获取和释放方法
第二步:内部写一个Sync类继承AbstractQueuedSynchronized
第三步:根据是否独占或共享来重写tryAcquire/tryRelease或tryAcquireShared(int acquires)和tryReleaseShared(int release)等方法,在之前写的获取/释放方法中调用AQS的acquire/release或Shared方法。
5:利用AQS实现自己的门闩
aqs/OneShotLatch.java
package aqs;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* 描述:自己用AQS实现一个简单的线程协作器
*/
public class OneShotLatch {
private final Sync sync = new Sync();
//获取锁,利用AQS提供的
public void await() {
sync.acquireShared(0);
}
//释放锁
public void signal() {
sync.releaseShared(0);
}
private class Sync extends AbstractQueuedSynchronizer {
@Override
protected int tryAcquireShared(int arg) {
//state=1放行
return (getState() == 1) ? 1 : -1;
}
@Override
protected boolean tryReleaseShared(int arg) {
setState(1);
//唤醒所有其它等待的线程
return true;
}
}
public static void main(String[] args) throws InterruptedException {
OneShotLatch oneShotLatch = new OneShotLatch();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"尝试获取latch,获取失败那就等待");
oneShotLatch.await();
System.out.println("开闸放行"+Thread.currentThread().getName()+"继续运行");
}
}).start();
}
//5秒后放闸
Thread.sleep(5000);
//唤醒等待线程。
oneShotLatch.signal();
}
}