CountDownLatch计数器源码详解

一、CountDownLatch

1.简介

对计数器源码进行分析,首先介绍下这个工具类的作用:控制并发流程,到了一定数量才能执行。倒数结束之前,一直处于等待状态,直到倒计时结束了,此线程才继续工作。

CountDownLatch仅有一个构造函数:参数为需要倒数的数值

/**
 * Constructs a {@code CountDownLatch} initialized with the given count.
 *
 * @param count the number of times {@link #countDown} must be invoked
 *        before threads can pass through {@link #await}
 * @throws IllegalArgumentException if {@code count} is negative
 */
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
2.类结构分析

CountDownLatch本身是一个没有任何关联关系的工具类。

但是其有一个final修饰的内部类Sync继承了类AbstractQueuedSynchronizer。也就是有名的AQS。

3.AQS

那么什么是AQS,AQS的作用又是什么?

首先放上一张AQS的关系类图

CountDownLatch计数器源码详解

CountDownLatch、ReetrantReadWriteLock等都有类似于线程协作的功能,或者叫同步功能,其实,它们底层都用了一个共同的基类,这就是AQS。对于这些协作类,很多工作都是相似的,因此希望能够抽取出来一个工具类。AQS是一个工具,使用的方式简单来说就是将其放到了要使用的类的内部,作为一个内部类。

AQS保证了的

  1. 同步状态的原子性管理
  2. 线程的阻塞与解除阻塞
  3. 队列的管理

AQS的作用

  • AQS是一个用于构建锁,同步锁、协作工具类。有了AQS,构建线程协作类就容易多了

AQS的具体内部实现可以看

美团技术团队《从ReentrantLock的实现看AQS的原理及应用》:https://mp.weixin.qq.com/s/sA01gxC4EbgypCsQt5pVog
老钱《打通 Java 任督二脉 —— 并发数据结构的基石》:https://juejin.im/post/5c11d6376fb9a049e82b6253
HongJie《一行一行源码分析清楚AbstractQueuedSynchronizer》:https://javadoop.com/post/AbstractQueuedSynchronizer
爱吃鱼的KK《AbstractQueuedSynchronizer 源码分析 (基于Java 8)》:https://www.jianshu.com/p/e7659436538b
waterystone《Java并发之AQS详解》:https://www.cnblogs.com/waterystone/p/4920797.html
英文论文的中文翻译:https://www.cnblogs.com/dennyzhangdd/p/7218510.html
AQS作者的英文论文:http://gee.cs.oswego.edu/dl/papers/aqs.pdf

这里简单介绍下AQS的使用,甚至小伙伴们可以利用AQS工具类定义出一个自己线程同步协作类

其内部有三个重要的属性及方法

三大部分

  • state 是一个被volitile修饰的int变量,这个state在不同的类有着不同的意义,比如在ReentrantLock中代表锁的占有情况,state值为0的时候,表示lock不被任何线程所占用。在Semaphore里,表示剩余的许可证的数量。CountDownLatch表示还需要倒数的数量。

  • 控制线程抢锁和配合的FIFO队列

这个队列用来存放“等待的线程”,AQS就是一个排队管理器,当多个线程争用同一把锁时候,必须有排队机制将那些没能拿到的锁的线程串在一起。当锁释放时,所管理器就会挑选一个合适的线程占有这个刚刚释放的锁

AQS会维护一个等待的线程队列,把线程都放到这个队列里

  • 期望协作工具类去实现的获取/释放等重要方法

使用方法

一、 写一个类,想好协作逻辑,实现获取、释放方法

二、内部写一个Sync类继承AbstractQueuedSynchronizer

三、根据是否独占来重写独占重写tryAcquire、tryRelease

共享重写tryReleaseShared等方法。在之前写的获取或释放方法中调用AQS的acquire、release或者shared方法

4.CountDownLatch重要方法

介绍完AQS以及其使用方式,重新将目光放回到利用AQS的一个重要实现CountDownLatch,

CountDownLatch内部类Sync继承了AbstractQueuedSynchronizer类,自然得到了其state属性,在CountDownLatch类中state属性代表的是还需要倒数的数量,这点在前面也有所提到过。

/**
 * Synchronization control For CountDownLatch.
 * Uses AQS state to represent count.
 */
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

private final Sync sync;

有了对于这个属性的理解,我们可以进而对其内部的重要方法进行展开

紧接着就是几个CountDownLatch重要方法的介绍

  1. getCount()方法
/**
 * Returns the current count.
 *
 * <p>This method is typically used for debugging and testing purposes.
 *
 * @return the current count
 */
public long getCount() {
    return sync.getCount();
}

源码可以看出其本质非常简单,就是调用了内部类的getCount()方法,在其内部做了什么呢

int getCount() {
    return getState();
}

其内部做了一件非常简单的事,那就是返回当前state的值,那么看到这里是不是就能够和最开始的介绍对应上,这个state属性在countDownLatch计数器中就是作为计数器中的数而存在。

  1. await()方法
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

看到await方法又是调用了内部类的acquireSharedInterruptibly()方法,这个方法不需要我们的Sync内部类去重写,而是被定义在了AQS类的内部,我们看它的源码。

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

在这里我们就可以提取出两个重要的信息

  • await()方法是可以被中断的,它是能够响应中断的
  • doAcquireSharedInterruptibly(arg)方法很多小伙伴可能对其作用比较陌生,简单的来说这个类的作用就是将线程放到阻塞队列。
/**
 * Acquires in shared interruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

每一个线程是以node节点的形式进行储存。

那么什么时候需要将这个线程放入到阻塞队列呢?

我们可以观察到tryAcquireShared(arg)这个方法的返回值决定了是否这个线程需要被阻塞,这个方法也正是AQS重写策略*享情况下需要被重写的方法。我们观察Sync这个类里面,也确实对这个方法进行了重写。

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

这个方法很容易理解,当计数器数值为0的时候,会返回1,1>0 ,条件不成立,线程不会被阻塞;当计数器的数值数值为大于0时,返回值为-1,-1<0,条件成立,线程会被阻塞。这不就正是计数器await()类的实现逻辑。

  1. countDown()

最后介绍最后一个重要的方法,countDown()方法,使用过计数器的小伙伴都知道,这个方法的作用是使得计数器减1。

public void countDown() {
    sync.releaseShared(1);
}

同样的方式,调用到了AQS内部类的函数,这个方法被final修饰不能被重写,我们来看一下这个函数

/**
 * Releases in shared mode.  Implemented by unblocking one or more
 * threads if {@link #tryReleaseShared} returns true.
 *
 * @param arg the release argument.  This value is conveyed to
 *        {@link #tryReleaseShared} but is otherwise uninterpreted
 *        and can represent anything you like.
 * @return the value returned from {@link #tryReleaseShared}
 */
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

内部的doReleaseShared()方法的作用简单来说就是唤醒阻塞队列中的全部线程

我们来看其内部实现

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

很容易看出其就是遍历了一个个node线程节点,并且内部用到了CAS(一个重要的并发安全实现思想)。

那么什么时候需要唤醒全部线程呢?

到这里我想大家都和我一样已经恍然大悟,tryReleaseShared(arg)这个方法也就正是共享情况下需要被重写的另一个方法。

到了这里我想大家都明白了,我们自定义的这个线程合作工具类,其本质需要自己去完成的就是去判断什么时候去触发AQS工具类为我们提供的并发方法。

我们来看这个方法

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

这个方法我们用到了另一个重要的并发思想CAS,我们所熟知的乐观锁、AtomicInteger等一些列原子类都是应用了这个思想。

想要进一步了解CAS,可以看下面的介绍。

有了CAS的基础我想我们可以很轻松的理解上面那段代码,它所做的就是用循环的方式去尝试将现有的state数值减一,如果state已经是0了,那么就直接返回false,表示没有等待的线程,因此不需要任何的操作。state不为0,那么就用CAS的方式将state的值减一,用CAS的方式是为了保证state的值不会因为并发出现错乱。设置成功后还要分为两种情况,state值变为0了说明计数器的阀门恰好已经到了,这时候返回true,唤醒所有阻塞的线程;state的值变了,但依然大于0,则仅仅只是将state计数器的值减一,说明阀门的开关还没有被打开,不执行任何其它操作,返回false。

5.CAS

什么是CAS (CompareAndSwap)

思路:我认为V的值应该是A,如果是的话那我就把他改成B,如果不是A(说明被别人改过),那么我就不改了

CAS有三个操作数:内存值V、预期值A、要修改的值b,当且仅当预期值A和内存值V相同时,才能将内存值修改为B,否则什么都不做。最后返回现在的V值

利用到了CPU的特殊指令:保证原子性

在java中是如何利用CAS实现原子操作的

  1. AutomaticInteger加载Unsafe工具用来直接操作内存数据
  2. 用Unsafe来实现底层操作
  3. 用volatile修饰value字段,保证可见性
  4. getAndAddInt方法分析

让我们来看看AtomicInteger是如何通过CAS实现并发下的累加操作的,以AtomicInteger的getAndAdd方法为突破口。

1 getAndAdd方法

public final int getAndAdd(int delta) {    
    return unsafe.getAndAddInt(this, valueOffset, delta);
}

可以看出,这里使用了Unsafe这个类,这里需要简要介绍一下Unsafe类:

2 Unsafe类
Unsafe是CAS的核心类。Java无法直接访问底层操作系统,而是通过本地(native)方法来访问。不过尽管如此,JVM还是开了一个后门,JDK中有一个类Unsafe,它提供了硬件级别的原子操作。

3 AtomicInteger加载Unsafe工具,用来直接操作内存数据

public class AtomicInteger extends Number implements java.io.Serializable {
    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;
    public final int get() {return value;}
}

在AtomicInteger数据定义的部分,我们还获取了unsafe实例,并且定义了valueOffset。再看到static块,懂类加载过程的都知道,static块的加载发生于类加载的时候,是最先初始化的,这时候我们调用unsafe的objectFieldOffset从Atomic类文件中获取value的偏移量,那么valueOffset其实就是记录value的偏移量的。

valueOffset表示的是变量值在内存中的偏移地址,因为Unsafe就是根据内存偏移地址获取数据的原值的,这样我们就能通过unsafe来实现CAS了。value是用volatile修饰的,保证了多线程之间看到的value值是同一份。

4 接下来继续看Unsafe的getAndAddInt方法的实现

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    return var5;
}

我们看var5获取的是什么,通过调用unsafe的getIntVolatile(var1, var2),这是个native方法,其实就是获取var1中,var2偏移量处的值。var1就是AtomicInteger,var2就是我们前面提到的valueOffset,这样我们就从内存里获取到现在valueOffset处的值了。
现在重点来了,compareAndSwapInt(var1, var2, var5, var5 + var4)其实换成compareAndSwapInt(obj, offset, expect, update)比较清楚,意思就是如果obj内的value和expect相等,就证明没有其他线程改变过这个变量,那么就更新它为update,如果这一步的CAS没有成功,那就采用自旋的方式继续进行CAS操作。

Unsafe的getAndAddInt方法分析:自旋 + CAS,在这个过程中,通过compareAndSwapInt比较并更新value值,如果更新失败,重新获取,然后再次更新,直到更新成功。

缺点:

  1. ABA问题,问题由来:一个线程将源数据5改为7,另一个线程又改回了5,然后本线程想要对这个变量操作,原有数据还是5,会认为它从来没有被修改过,进而引发一些逻辑错误的问题
  2. 解决方法:解决ABA问题,可以仿照数据库的方式,引用版本号的机制,去对比版本号,比对比值更可靠
  3. 自旋时间过长

CountDownLatch计数器源码详解

上一篇:1028 List Sorting (25 分)


下一篇:CentOS7 安装JKD1.8