一、CountDownLatch
1.简介
对计数器源码进行分析,首先介绍下这个工具类的作用:控制并发流程,到了一定数量才能执行。倒数结束之前,一直处于等待状态,直到倒计时结束了,此线程才继续工作。
CountDownLatch仅有一个构造函数:参数为需要倒数的数值
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
2.类结构分析
CountDownLatch本身是一个没有任何关联关系的工具类。
但是其有一个final修饰的内部类Sync继承了类AbstractQueuedSynchronizer。也就是有名的AQS。
3.AQS
那么什么是AQS,AQS的作用又是什么?
首先放上一张AQS的关系类图
CountDownLatch、ReetrantReadWriteLock等都有类似于线程协作的功能,或者叫同步功能,其实,它们底层都用了一个共同的基类,这就是AQS。对于这些协作类,很多工作都是相似的,因此希望能够抽取出来一个工具类。AQS是一个工具,使用的方式简单来说就是将其放到了要使用的类的内部,作为一个内部类。
AQS保证了的
- 同步状态的原子性管理
- 线程的阻塞与解除阻塞
- 队列的管理
AQS的作用
- AQS是一个用于构建锁,同步锁、协作工具类。有了AQS,构建线程协作类就容易多了
AQS的具体内部实现可以看
美团技术团队《从ReentrantLock的实现看AQS的原理及应用》:https://mp.weixin.qq.com/s/sA01gxC4EbgypCsQt5pVog
老钱《打通 Java 任督二脉 —— 并发数据结构的基石》:https://juejin.im/post/5c11d6376fb9a049e82b6253
HongJie《一行一行源码分析清楚AbstractQueuedSynchronizer》:https://javadoop.com/post/AbstractQueuedSynchronizer
爱吃鱼的KK《AbstractQueuedSynchronizer 源码分析 (基于Java 8)》:https://www.jianshu.com/p/e7659436538b
waterystone《Java并发之AQS详解》:https://www.cnblogs.com/waterystone/p/4920797.html
英文论文的中文翻译:https://www.cnblogs.com/dennyzhangdd/p/7218510.html
AQS作者的英文论文:http://gee.cs.oswego.edu/dl/papers/aqs.pdf
这里简单介绍下AQS的使用,甚至小伙伴们可以利用AQS工具类定义出一个自己线程同步协作类
其内部有三个重要的属性及方法
三大部分
-
state 是一个被volitile修饰的int变量,这个state在不同的类有着不同的意义,比如在ReentrantLock中代表锁的占有情况,state值为0的时候,表示lock不被任何线程所占用。在Semaphore里,表示剩余的许可证的数量。CountDownLatch表示还需要倒数的数量。
-
控制线程抢锁和配合的FIFO队列
这个队列用来存放“等待的线程”,AQS就是一个排队管理器,当多个线程争用同一把锁时候,必须有排队机制将那些没能拿到的锁的线程串在一起。当锁释放时,所管理器就会挑选一个合适的线程占有这个刚刚释放的锁
AQS会维护一个等待的线程队列,把线程都放到这个队列里
- 期望协作工具类去实现的获取/释放等重要方法
使用方法
一、 写一个类,想好协作逻辑,实现获取、释放方法
二、内部写一个Sync类继承AbstractQueuedSynchronizer
三、根据是否独占来重写独占重写tryAcquire、tryRelease
共享重写tryReleaseShared等方法。在之前写的获取或释放方法中调用AQS的acquire、release或者shared方法
4.CountDownLatch重要方法
介绍完AQS以及其使用方式,重新将目光放回到利用AQS的一个重要实现CountDownLatch,
CountDownLatch内部类Sync继承了AbstractQueuedSynchronizer类,自然得到了其state属性,在CountDownLatch类中state属性代表的是还需要倒数的数量,这点在前面也有所提到过。
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
有了对于这个属性的理解,我们可以进而对其内部的重要方法进行展开
紧接着就是几个CountDownLatch重要方法的介绍
- getCount()方法
/**
* Returns the current count.
*
* <p>This method is typically used for debugging and testing purposes.
*
* @return the current count
*/
public long getCount() {
return sync.getCount();
}
源码可以看出其本质非常简单,就是调用了内部类的getCount()方法,在其内部做了什么呢
int getCount() {
return getState();
}
其内部做了一件非常简单的事,那就是返回当前state的值,那么看到这里是不是就能够和最开始的介绍对应上,这个state属性在countDownLatch计数器中就是作为计数器中的数而存在。
- await()方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
看到await方法又是调用了内部类的acquireSharedInterruptibly()方法,这个方法不需要我们的Sync内部类去重写,而是被定义在了AQS类的内部,我们看它的源码。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
在这里我们就可以提取出两个重要的信息
- await()方法是可以被中断的,它是能够响应中断的
- doAcquireSharedInterruptibly(arg)方法很多小伙伴可能对其作用比较陌生,简单的来说这个类的作用就是将线程放到阻塞队列。
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
每一个线程是以node节点的形式进行储存。
那么什么时候需要将这个线程放入到阻塞队列呢?
我们可以观察到tryAcquireShared(arg)这个方法的返回值决定了是否这个线程需要被阻塞,这个方法也正是AQS重写策略*享情况下需要被重写的方法。我们观察Sync这个类里面,也确实对这个方法进行了重写。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
这个方法很容易理解,当计数器数值为0的时候,会返回1,1>0 ,条件不成立,线程不会被阻塞;当计数器的数值数值为大于0时,返回值为-1,-1<0,条件成立,线程会被阻塞。这不就正是计数器await()类的实现逻辑。
- countDown()
最后介绍最后一个重要的方法,countDown()方法,使用过计数器的小伙伴都知道,这个方法的作用是使得计数器减1。
public void countDown() {
sync.releaseShared(1);
}
同样的方式,调用到了AQS内部类的函数,这个方法被final修饰不能被重写,我们来看一下这个函数
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
内部的doReleaseShared()方法的作用简单来说就是唤醒阻塞队列中的全部线程
我们来看其内部实现
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
很容易看出其就是遍历了一个个node线程节点,并且内部用到了CAS(一个重要的并发安全实现思想)。
那么什么时候需要唤醒全部线程呢?
到这里我想大家都和我一样已经恍然大悟,tryReleaseShared(arg)这个方法也就正是共享情况下需要被重写的另一个方法。
到了这里我想大家都明白了,我们自定义的这个线程合作工具类,其本质需要自己去完成的就是去判断什么时候去触发AQS工具类为我们提供的并发方法。
我们来看这个方法
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
这个方法我们用到了另一个重要的并发思想CAS,我们所熟知的乐观锁、AtomicInteger等一些列原子类都是应用了这个思想。
想要进一步了解CAS,可以看下面的介绍。
有了CAS的基础我想我们可以很轻松的理解上面那段代码,它所做的就是用循环的方式去尝试将现有的state数值减一,如果state已经是0了,那么就直接返回false,表示没有等待的线程,因此不需要任何的操作。state不为0,那么就用CAS的方式将state的值减一,用CAS的方式是为了保证state的值不会因为并发出现错乱。设置成功后还要分为两种情况,state值变为0了说明计数器的阀门恰好已经到了,这时候返回true,唤醒所有阻塞的线程;state的值变了,但依然大于0,则仅仅只是将state计数器的值减一,说明阀门的开关还没有被打开,不执行任何其它操作,返回false。
5.CAS
什么是CAS (CompareAndSwap)
思路:我认为V的值应该是A,如果是的话那我就把他改成B,如果不是A(说明被别人改过),那么我就不改了
CAS有三个操作数:内存值V、预期值A、要修改的值b,当且仅当预期值A和内存值V相同时,才能将内存值修改为B,否则什么都不做。最后返回现在的V值
利用到了CPU的特殊指令:保证原子性
在java中是如何利用CAS实现原子操作的
- AutomaticInteger加载Unsafe工具用来直接操作内存数据
- 用Unsafe来实现底层操作
- 用volatile修饰value字段,保证可见性
- getAndAddInt方法分析
让我们来看看AtomicInteger是如何通过CAS实现并发下的累加操作的,以AtomicInteger的getAndAdd方法为突破口。
1 getAndAdd方法
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
可以看出,这里使用了Unsafe这个类,这里需要简要介绍一下Unsafe类:
2 Unsafe类
Unsafe是CAS的核心类。Java无法直接访问底层操作系统,而是通过本地(native)方法来访问。不过尽管如此,JVM还是开了一个后门,JDK中有一个类Unsafe,它提供了硬件级别的原子操作。
3 AtomicInteger加载Unsafe工具,用来直接操作内存数据
public class AtomicInteger extends Number implements java.io.Serializable {
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
public final int get() {return value;}
}
在AtomicInteger数据定义的部分,我们还获取了unsafe实例,并且定义了valueOffset。再看到static块,懂类加载过程的都知道,static块的加载发生于类加载的时候,是最先初始化的,这时候我们调用unsafe的objectFieldOffset从Atomic类文件中获取value的偏移量,那么valueOffset其实就是记录value的偏移量的。
valueOffset表示的是变量值在内存中的偏移地址,因为Unsafe就是根据内存偏移地址获取数据的原值的,这样我们就能通过unsafe来实现CAS了。value是用volatile修饰的,保证了多线程之间看到的value值是同一份。
4 接下来继续看Unsafe的getAndAddInt方法的实现
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
我们看var5获取的是什么,通过调用unsafe的getIntVolatile(var1, var2),这是个native方法,其实就是获取var1中,var2偏移量处的值。var1就是AtomicInteger,var2就是我们前面提到的valueOffset,这样我们就从内存里获取到现在valueOffset处的值了。
现在重点来了,compareAndSwapInt(var1, var2, var5, var5 + var4)其实换成compareAndSwapInt(obj, offset, expect, update)比较清楚,意思就是如果obj内的value和expect相等,就证明没有其他线程改变过这个变量,那么就更新它为update,如果这一步的CAS没有成功,那就采用自旋的方式继续进行CAS操作。
Unsafe的getAndAddInt方法分析:自旋 + CAS,在这个过程中,通过compareAndSwapInt比较并更新value值,如果更新失败,重新获取,然后再次更新,直到更新成功。
缺点:
- ABA问题,问题由来:一个线程将源数据5改为7,另一个线程又改回了5,然后本线程想要对这个变量操作,原有数据还是5,会认为它从来没有被修改过,进而引发一些逻辑错误的问题
- 解决方法:解决ABA问题,可以仿照数据库的方式,引用版本号的机制,去对比版本号,比对比值更可靠
- 自旋时间过长