LongAdder源码解析

目录

1. AtomicLong

AtomicLongJUC包下的原子类,在并发情况下进行计数操作时使用AtomicLong可以保证数据的准确性

下面是AtomicLong类的加1和减1操作的源码

//AtomicLong类的加1和减1操作
public final long incrementAndGet() {
    return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}

public final long decrementAndGet() {
    return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;
}

我们看到它们都调用了unsafe的getAndAddLong方法,那么我们来看看getAndAddLong的源码

//unsafe类的getAndAddLong方法源码
public final long getAndAddLong(Object var1, long var2, long var4) {
    long var6;
    do {
        var6 = this.getLongVolatile(var1, var2);
    } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));

    return var6;
}

unsafe类的getAndAddLong方法,我们看到这个方法其实就是CAS+自旋compareAndSwapLong也就是CAS操作,通过调用底层cpu指令执行比较并交换操作,在多线程情况下如果写入失败通过自旋操作重复执行getAndAddLong方法,这也是AtomicLong原子类的瓶颈所在(重复自旋会降低时间效率)。

2. AtomicLong和LongAdder的比较

通过上面的分析我们知道AtomicLong因为自旋所以时间效率大大降低,那么在高并发情况下有没有一个好的方案来进行计数操作呢?

在JDK1.8中Doug Lea大神写了一个LongAdder来解决这个问题,我们先比较一下AtomicLongLongAdder时间效率吧,后面我们会对LongAdder类的源码进行分析,我们执行一下下面的代码:

//AtomicLong和LongAdder性能测试
package LongAdder;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;
import java.util.concurrent.atomic.LongAdder;

/**
 * AtomicLong和LongAdder性能测试
 */
public class Demo01 {

    public static void main(String[] args) throws InterruptedException {
        testAtomicAndLongAdder(1, 10000000);
        testAtomicAndLongAdder(10, 10000000);
        testAtomicAndLongAdder(100, 10000000);

    }
    public static void testAtomicAndLongAdder(int threadCount, int times) throws InterruptedException {
        System.out.println("threadCount: " + threadCount + ", times: " + times);
        long start = System.currentTimeMillis();
        testLongAdder(threadCount, times);
        System.out.println("LongAdder 耗时:"  + (System.currentTimeMillis() - start) + "ms");
        System.out.println("threadCount: " + threadCount + ", times: " + times);
        long atomicStart = System.currentTimeMillis();
        testAtomicLong(threadCount, times);
        System.out.println("AtomicLong 耗时:"  + (System.currentTimeMillis() - atomicStart) + "ms");
        System.out.println("-----------------------------------------");
    }
    public static void testAtomicLong(int threadCount, int times) throws InterruptedException {
        AtomicLong a = new AtomicLong();
        List<Thread> list = new ArrayList<>();
        for(int i = 0; i < threadCount; i++){
            list.add(new Thread(() -> {
                for(int j = 0; j < times; j++){
                    a.incrementAndGet();
                }
            }));
        }
        for(Thread thread : list){
            thread.start();
        }
        for(Thread thread : list){
            thread.join();
        }
        System.out.println("AtomicLong value is : " + a.get());
    }

    public static void testLongAdder(int threadCount, int times) throws InterruptedException {
        LongAdder a = new LongAdder();
        List<Thread> list = new ArrayList<>();
        for(int i = 0; i < threadCount; i++){
            list.add(new Thread(() -> {
                for(int j = 0; j < times; j++){
                    a.increment();
                }
            }));
        }
        for(Thread thread : list){
            thread.start();
        }
        for(Thread thread : list){
            thread.join();
        }
        System.out.println("LongAdder value is : " + a.longValue());
    }
}

下面是执行结果:

LongAdder源码解析

我们可以发现,在并发量较小的时候AtomicLong和LongAdder的时间性能差别不大,但是随着并发量的增加,LongAdder的时间效率是AtomicLong的几十倍,这也说明在时间效率LongAdder的优越性。那么接下来我们就来分析一下为什么LongAdder在并发情况下这么快呢?

3. LongAdder

3.1 LongAdder原理分析

我们先看一张原理图

LongAdder源码解析

LongAdder在高并发情况下的高性能是怎么做到的呢?

1.设计思想上,采用分段的方式降低并发冲突的概率

我们知道,AtomicLong内部有一个value变量保存着实际的值,所有的操作都会围绕着该变量操作。也就是说在高并发的情况下,该变量是一个热点数据(假如有N个线程同时争抢该变量,只会有一个线程成功,其余N-1个线程会自旋)。

LongAdder的设计思路就是分散热点,将value值的新增操作分散到一个数组中,不同线程命中数组的不同槽,各个线程只对自己的槽中的值进行CAS操作,这样热点就分散了,冲突的概率就降低了很多。

LongAdder继承了Striped64这个类,Striped64这个类中有一个全局变量transient volatile long base,在并发度不高的情况下直接通过CAS操作这个base值,如果CAS失败,就对cell[]数组中的cell单元格进行CAS操作,减小冲突的概率。

例如当前类中base = 10,有三个线程对base进行CAS操作,线程1执行成功,此时base = 11,线程2、线程3执行失败后转而对自己的cell单元格内的数值进行CAS操作,此时这两个线程对应的cell单元格分别被设为了1

执行完成后统计累加数据:sum = 11 + 1 + 1 = 13,利用LongAdder类的add()求得的才是实际值,历程图如下:

LongAdder源码解析

实际的值应该是base的值和各个cell单元格内的值累加的和

2.使用Contened注解来消除伪共享

在LongAdder的父类Striped64中存在一个transient volatile Cell[] cells数组,其长度是2的幂次方,每个cell单元格都使用@sun.misc.Contended注解进行修饰,而@sun.misc.Contended注解可以进行缓冲行填充,从而解决伪共享问题。伪共享会导致缓冲行失效缓存一致性开销变大。

@sun.misc.Contended static final class Cell {
 
}

伪共享指的是多个线程同时读写同一个缓存行的不同变量时导致的cpu缓存失效,尽管这些变量之间没有任何关系,但由于在主内存中临近,存在于同一个缓存行中,它们的相互覆盖会导致频繁缓存未命中,引发性能下降

解决伪共享的方法一般都是使用直接填充,我们只需要保证不同线程访存的变量存在于不同的CacheLine即可,使用多余的字节进行填充可以做到这一点

LongAdder源码解析

缓存行填充对于大多数原子来说是繁琐的,因为它们大多不规则分散在内存中,因此彼此之间不会有太大的干扰。但是,驻留在数组中的原子对象往往彼此相邻,因此在没有这种预防措施的情况下,通常会共享缓存行数据(对性能有巨大的负面影响)。

3.惰性求值

LongAdder只有在使用longValue()获取当前累加值时才会真正的去结算计数的数据。longValue()底层就是调用sum()方法,对basecell数组的数据累加返回,做到数据写入读取分离。

AtomicLong使用**incrementAndGet()**每次都会返回long类型的计数值,每次递增还伴随着数据返回,增加了额外的开销。

3.2 LongAdder源码分析

LongAdder继承了Striped64类,Striped64类中定义了base值和cells数组,而因为LongAdder继承了Striped64类,所以可以直接使用

//Striped64类中的部分源码
transient volatile Cell[] cells;

transient volatile long base;

LongAdder的自增操作是increment(),它的内部其实调用了add(long x)方法

//LongAdder的部分源码
public void increment() {
    add(1L);
}
public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
    	//CASE1
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            //CASE1.1
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }

我们来介绍一下add(long x)方法的具体原理

变量说明

  • as表示cells数组的引用
  • b表示获取的base值
  • v表示期望值
  • m表示cells数组的长度
  • a表示当前线程命中的cell单元格

条件分析:

CASE1:

if ((as = cells) != null || !casBase(b = base, b + x)) {

**条件1:(as = cells) != null **

​ cells数组不为空

条件2: !casBase(b = base, b + x)

​ 当前线程竞争base失败(只有cells数组为空才可能会走到这一步)

LongAdder源码解析

CASE1.1:

if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))

条件1:as == null || (m = as.length - 1) < 0

​ cells数组为空(只有cells数组为空,且当前线程竞争base失败才可能走到这一步)

LongAdder源码解析

条件2:(a = as[getProbe() & m]) == null

​ 当前线程命中的cell单元格为空(只有cells数组不为空才可能会走到这一步)

LongAdder源码解析

条件3: !(uncontended = a.cas(v = a.value, v + x))

​ 当前线程竞争cas单元格失败*,说明有其他线程和当前线程一起竞争当前单元格(只有cells数组不为空,且当前线程命中的cell单 元格不为空才可能走到这一步)

LongAdder源码解析

/*
这是我的笔记
*/
public void add(long x) {
        //as: cells数组
        //b: base值
        //v:期望值
        //m:cells数组长度
        //a:当前线程名中的cell单元格
        Cell[] as; long b, v; int m; Cell a;


        //条件1:cells数组不为空(这种情况表明应该将数据写入cell单元格中)

        //条件2:当前线程竞争base失败(什么情况下条件2成立:cells数组为空,当前线程竞争cell单元格失败)
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            //当前线程竞争cas单元格的情况,true为成功,false为失败
            boolean uncontended = true;
            //条件1:cells数组为null(什么情况下可以走到这里:cells单元格为空,竞争base失败)
            if (as == null || (m = as.length - 1) < 0 ||
            //条件2:cell单元格为空(什么情况下可以走到这里来:cells数组不为空,但当前线程命中的cell单元格为空)
                (a = as[getProbe() & m]) == null ||
            //条件3:当前线程竞争cell单元格失败(什么情况下可以走到这里:cells数组不为空,当前线程命中的cell单元格也不为空,但是当前线程竞争cell单元格失败)
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
            //第一个参数是我们要加的数
            //第二个参数是我们要用的算法,这里为null
            //第三个参数是竞争cells单元格的情况(什么情况下uncontended为false,只有当前线程为不空,命中的cell单元格也不为空,但是竞争cell单元格失败才会为false,其他情况都为true)
        }
    }

我们总结一下什么情况下会执行**longAccumulate(x,null,uncontended)**方法:

  1. cells数组为空,且当前线程竞争base失败
  2. cells数组不为空,但当前线程命中的cell单元格为空
  3. cells数组不为空,且当前线程命中的cell单元格也不为空,但是当前线程竞争cell单元格失败(与其他线程命中同一个cell单元格)。

longAccumulate(x, null, uncontended)属于父类Striped64的方法,那么接下来我们学习一下Striped64的源码(当然也会着重学习**longAccumulate(x,null, uncontended)**方法)

//Striped64的部分源码
package java.util.concurrent.atomic;
import java.util.function.LongBinaryOperator;
import java.util.function.DoubleBinaryOperator;
import java.util.concurrent.ThreadLocalRandom;

@SuppressWarnings("serial")
abstract class Striped64 extends Number {
    //静态内部类
    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        //利用CAS操作赋值value
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

    //电脑的cpu核数(控制cells数组长度的关键条件)
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    //cells数组
    transient volatile Cell[] cells;

    //base值
    transient volatile long base;

    //表示当前有无线程抢到锁,1为有线程抢到锁,0为无线程抢到锁
    transient volatile int cellsBusy;


    Striped64() {
    }

    //CAS操作(对于base)
    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }

    //返回当前线程是否抢到了锁,返回true为抢到了,返回false为没抢到
    final boolean casCellsBusy() {
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }

    //得到当前线程的哈希值
    static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }
    //重置当前线程哈希值
    static final int advanceProbe(int probe) {
        probe ^= probe << 13;   // xorshift
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
        return probe;
    }
    // 有三种情况会执行该方法:
    // 1. cells数组为空,当前线程竞争base失败
    // 2. cells数组不为空,但是当前线程命中的cell单元格为null
    // 3. cells不为空,且当前线程命中的cell单元格也不为null,但是当前线程竞争cells单元格失败
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        //h表示当前线程的哈希值
        if ((h = getProbe()) == 0) {
            //重置哈希值
            ThreadLocalRandom.current(); // force initialization
            //将哈希值赋值给h
            h = getProbe();
            wasUncontended = true;
        }
        //表示扩容意向,false一定不会扩容,true可能会扩容
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            // as表示cells数组
            // a表示命中的cell单元格
            // n表示cell数组的长度
            // v表示期望值

            //CASE1:cells数组不为空,需要将值写到cell单元格中
            if ((as = cells) != null && (n = as.length) > 0) {

                //CASE1.1:当前线程命中的cell单元格为空,需要new一个cells单元格
                if ((a = as[(n - 1) & h]) == null) {

                    //CASE1.1.1:当前没有其他线程获取到锁
                    if (cellsBusy == 0) {       // Try to attach new Cell

                        //new一个cell单元格
                        Cell r = new Cell(x);   // Optimistically create

                        //CASE1.1.1.1:没有其他线程获取到锁且当前线程获取到锁(casCellsBusy()方法表示获取锁是否成功,成功会把cellsBusy设为1,通知其他线程已经有线程占用,只有当cellsBusy为0时才会获取成功)
                        if (cellsBusy == 0 && casCellsBusy()) {

                            //是否创建成功,标记一下,初始为false
                            boolean created = false;
                            try {               // Recheck under lock
                                //rs表示cells数组引用
                                //m表示cells数组的长度
                                //j表示当前线程命中的cell单元格的下标
                                Cell[] rs; int m, j;
                                //cells数组不为空
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    //当前线程命中的cells单元格为null
                                    rs[j = (m - 1) & h] == null) {
                                    //设置值
                                    rs[j] = r;
                                    //标记创建成功
                                    created = true;
                                }
                            } finally {
                                //恢复为无锁状态
                                cellsBusy = 0;
                            }
                            if (created)
                                //如果成功,跳出循环
                                break;
                            //没成功,自旋
                            continue;           // Slot is now non-empty
                        }
                    }
                    ///扩容意向,强制改为false
                    collide = false;
                }
                //CASE1.2:当前线程竞争cell单元格失败
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //CASE1.3:当前线程竞争cell单元格
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                //CASE1.4:当前cells数组长度大于NCPU
                            cells不等于as(其他数组已经扩容过了)
                            就不扩容了
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                //CASE1.5:设置扩容意向为true,但并不是一定扩容
                else if (!collide)
                    collide = true;
                //CASE1.6:真正扩容的逻辑
                //当前没有线程抢到锁,当前线程抢到锁
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            //CASE2:当前没有线程抢占锁,还没有其他线程已经初始化cells数组,当前线程抢占到了锁
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                //初始化成功且已经将值写入cell单元格,退出循环
                //如果init为false,且当前线程进入到了这里,说明已经有其他线程初始化了cells单元格,当前线程要自旋
                if (init)
                    break;
            }
            //CASE3:其他线程正在初始化cells数组,当前线程没有抢到锁,当前线程需要将值累加到base
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }


}

我们在学习LongAdder的add(long x)方法的时候,最后会进入longAccumulate方法,longAccumulate方法是add方法的核心,接下来我们来重点学习一下longAccumulate方法的源码

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        //CASE1
        if ((as = cells) != null && (n = as.length) > 0) {
            //CASE1.1
            if ((a = as[(n - 1) & h]) == null) {
                //CASE1.1.1
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    //CASE1.1.1.1
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            //CASE1.2
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            //CASE1.3
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            //CASE1.4
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            //CASE1.5
            else if (!collide)
                collide = true;
            //CASE1.6
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);
        }
        //CASE2
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        //CASE3
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

这个方法很长,我们一点点来分析

首先我们回顾一下什么情况下add()方法会进入longAccumulate()方法

  1. cells数组为空,且当前线程竞争base失败
  2. cells数组不为空,但是当前线程命中的cell单元格为空
  3. cells数组不为空,且当前线程命中的cell单元格也不为空,但是当前线程竞争cell单元格失败

longAccumulate()方法的参数

  • long x : 需要增加的值

  • LongBinaryOperator fn : 默认为null

  • boolean wasUncontended : 表示当前线程竞争cell单元格是否成功,true为成功,false为失败(只有cells数组不为空,且当前线

    ​ 程命中的cell单元格不为空,但是当前线程竞争cell单元格失败时才为false,其余情况为true)

再看一下Striped64中一些变量和方法的定义

  • base:类似于AtomicLong中的全局变量value,在cells数组为空将值写入base或者自己在扩容时其他线程先自己一步扩容也会将值写 入base中。

  • collide:表示扩容意向,collide为false表示一定不会扩容,collide为true表示可能会扩容

  • cellsBusy:初始化或者扩容时需要获取锁0表示没有线程获锁,1表示有其他线程获取到锁

  • casCellsBusy():通过CAS操作修改cellsBusy的值,当cellsBusy为0时将cellsBusy改为1,返回true表示当前线程获取到了锁,当

    ​ cellsBusy为1时返回false,表明当前线程获取锁失败

  • NCPU:表示当前计算机的cpu的数量,在cells数组扩容时会用到

  • getProbe() :获取当前线程的哈希值

  • advanceProbe(int probe):重置当前线程的哈希值

接下来正式分析longAccumulate方法

private static final long PROBE;

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
    	//设置为false,表示不会扩容(这是默认的)
    	boolean collide = false;                // True if last slot nonempty
    
static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

getProbe()方法是获取当前线程的哈希值,具体方法是通过UNSAFE.getInt()实现的

如果当前线程的哈希值h = getProbe()) == 0,0与任何数取模都是0,会固定在数组的第一个位置,所以这里做了优化,使用ThreadLocalRandom.current()重新计算了一个哈希值,最后设置wasUncontended为true,这里的含义是重新计算了当前线程的哈希值后认为此次不算是一次竞争。哈希值被重置就好比一个全新的线程,所以设置了竞争状态为true。

可以画图理解:

LongAdder源码解析

接着执行for循环,我们可以把for循环代码拆分一下,每一个if条件作为一个CASE来分析

for (;;) {
    Cell[] as; Cell a; int n; long v;
    //CASE1:
    if ((as = cells) != null && (n = as.length) > 0) {
 
    }
    //CASE2:
    else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
    }
    //CASE3:
    else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))

}

如上所示,我们将每一个if语句CASE1.1这种形式来讲解

CASE1执行条件:

if ((as = cells) != null && (n = as.length) > 0) {
 
    }

cells数组不为空,且cells数组长度大于0会执行CASE1,CASE1实现的代码比较多,我们放到最后讲解

CASE2执行条件和实现原理:

//CASE2
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
    boolean init = false;
    try {                           // Initialize table
        //CASE2.1
        if (cells == as) {
            //初始化
            Cell[] rs = new Cell[2];
            rs[h & 1] = new Cell(x);
            cells = rs;
            init = true;
        }
    } finally {
        cellsBusy = 0;
    }
    if (init)
        break;
}

CASE2标识cells数组为空cells == as判断当前获取的cells数组还是和以前的一样(当前没有其他线程初始化cells数组,如果这个条件不成立说明cells数组已经初始化,会执行CASE3竞争base),且当前线程获取到了锁才会执行CASE2。

LongAdder源码解析

我们注意在CASE2里面还有一个if语句if (cells == as) 这是为了防止已经有其他线程初始化了cells数组,如果当前线程仍然继续初始化,那么其他线程初始化后cell单元格里面的数据会丢失

LongAdder源码解析

CASE3执行条件和实现原理

else if (casBase(v = base, ((fn == null) ? v + x :
                            fn.applyAsLong(v, x))))
    break;    

进入到这里说明当前线程正在初始化,执行casBase()方法,通过CAS操作来修改base,这个CASE只有在初始化cells数组的时候,线程在加锁失败的话才会走到这个分支,然后通过CAS操作来修改base值

CASE1实现原理:

进入CASE1的前提是cells数组不为空

接下来我们来一点点的拆分代码,首先看一下CASE1.1

            //CASE1.1
            if ((a = as[(n - 1) & h]) == null) {
                //CASE1.1.1
                if (cellsBusy == 0) {       
                    Cell r = new Cell(x);   
                    //CASE1.1.1.1
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {          
                            Cell[] rs; int m, j;
                            //CASE1.1.1.1.1
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           
                    }
                }
                collide = false;
            }

CASE1.1表示当前线程命中的cell单元格为空,表明没有线程在此处创建对象(只有cells数组不为空才可能走到这里),接着CASE1.1.1

if (cellsBusy == 0)表示当前没有其他线程获取到锁**,将命中的cell单元格创建出来,之后当前线程就要获取锁了**,执行CASE1.1.1.1if (cellsBusy == 0 && casCellsBusy())表明没有其他线程获取锁且当前线程去获取锁成功,之后要判断一下当前线程命中的cell单元格是否还为空,CASE1.1.1.1.1 if ((rs = cells) != null &&(m = rs.length) > 0 && rs[j = (m - 1) & h] == null)就是起这个作用的,之后将值赋值给命中的且已经创建的cell单元格,执行created = true表明赋值成功,之后finally语句中释放锁,之后就可以退出循环了,如果没有赋值成功,那就再来一次自旋

我们看一下最后的 collide = false,只有当前线程命中的cell单元格为null但是当前线程没有获得锁时才会执行,将collide 设为false表明没有扩容想法,也正符合实际情况。

接下来我们来看CASE1.2:


else if (!wasUncontended)      
    wasUncontended = true;      

//CASE1里面的最后
h = advanceProbe(h);

wasUncontended设为false表明当前线程竞争cell单元格失败,之后会执行advanceProbe(h)方法重置哈希值,之后自旋

接着看CASE1.3:

else if (a.cas(v = a.value, ((fn == null) ? v + x :
                             fn.applyAsLong(v, x))))
    break;

能够执行到这里说明cells数组不为空,且这已经是重置过一次的了,这种情况下就竞争cell单元格,竞争成功就退出循环,竞争失败就自旋。

接着看CASE1.4:

else if (n >= NCPU || cells != as)
    collide = false;            // At max size or stale

当cells数组的长度大于等于cpu的核数或者cells != as设置collide = false表明没有扩容意向。这里超过cpu的核数不再扩容是因为cpu的核数代表计算机的处理能力,当超过cpu的核数时,多出来的cell单元格没太大作用,反而占用空间。

接着看CASE1.5:

else if (!collide)
    collide = true;

如果扩容意向为false,就将collide设为true,之后执行advanceProbe方法重置哈希值。这里的操作是为了当collide为false时就不再执行后面的CASE1.6扩容操作了。

接着看CASE1.6

else if (cellsBusy == 0 && casCellsBusy()) {
    try {
        if (cells == as) {      // Expand table unless stale
            Cell[] rs = new Cell[n << 1];
            for (int i = 0; i < n; ++i)
                rs[i] = as[i];
            cells = rs;
        }
    } finally {
        cellsBusy = 0;
    }
    collide = false;
    continue;                   // Retry with expanded table
}

这里面其实是扩容逻辑,首先判断当前有无线程加锁,如果没有线程加锁那就通过casCellsBusy()方法尝试加锁,加锁成功之后将cellsBusy设为1,里面有一个if语句if (cells == as)是为了判断当前的cells数组和原来的数组是不是同一个(防止其他线程已经扩容过了),之后扩容为原来数组的两倍,之后将旧数组中的值拷贝到新数组中去,设置cellsBusy为0释放锁,设置collide为false表明已经没有扩容意向了,之后自旋

LongAdder的sum()方法

当我们使用LongAdder作为计数器时,需要调用sum()方法来汇总数据

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

内部实现就是将base所有不为null的cell单元格内数值求和

4. 总结

LongAdder源码解析

LongAdder的核心思想就是空间换时间,将一个热点数据分散成cells数组从而减小冲突,以此来提升性能。

如果对于并发要求不高但是对精确要求很高的情况下还是建议使用AtomicLong

如果对于并发要求很高但是对精确要求不高的情况下建议使用LongAdder(因为在高并发下如果有并发更新,则利用sum汇总时数据可能不准确)

文章学习与:
https://blog.csdn.net/u012881584/article/details/106133349
https://www.bilibili.com/video/BV1KE411K7Ts?p=3&spm_id_from=333.1007.top_right_bar_window_history.content.click
上一篇:关于excel操作的一些记录


下一篇:spread 15.0 for wpf