原子累加器 :LongAdder

LongAdder

LongAdder是java8为我们提供的新的类,跟AtomicLong有相同的效果。是对CAS机制的优化

继承于Striped64 ,其子类还有LongAccumulator,DoubleAdder,DoubleAccumulator

LongAccumulator 是LongAdder的扩展,让用户自定义累加规则。可以传入函数式接口

DoubleAdder操作浮点型,DoubleAccumulator 是DoubleAdder的扩展,让用户自定义累加规则。可以操作函数式接口

public class LongAdderTest {

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            demo(() -> new AtomicLong(0), AtomicLong::getAndIncrement);
        }

        for (int i = 0; i < 5; i++) {
            demo(LongAdder::new, LongAdder::increment);
        }
    }

    /*
   () -> 结果    提供累加器对象
   (参数) ->     执行累加操作
    */
    private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
        T adder = adderSupplier.get();
        List<Thread> ts = new ArrayList<>();
        // 4 个线程,每人累加 50 万
        for (int i = 0; i < 4; i++) {
            ts.add(new Thread(() -> {
                for (int j = 0; j < 500000; j++) {
                    action.accept(adder);
                }
            }));
        }
        long start = System.nanoTime();
        ts.forEach(Thread::start);
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        long end = System.nanoTime();
        System.out.println(adder + " cost:" + (end - start) / 1000_000);
    }
}

原因:CAS底层实现是在一个死循环中不断地尝试修改目标值,直到修改成功。如果竞争不激烈的时候,修改成功率很高,但是如果竞争激烈的时候,失败率很高。在失败的时候,这些重复的原子性操作会耗费性能。(不停的自旋,进入一个无限重复的循环中)

longAdder 在有竞争时,设置多个累加单元, Therad-0累加celo,而 Thread-累加 cell[0],Thread-1 累加Cell[1]… 最后将结果汇总最后将结果汇总。这样它们在累加时操作的不同的cell变量,因此减少了CAS重试失败,从而提 高性能

累加单元和cpu的核数有关,竞争激烈时,设置多个累加单元,核心数越多提示效果越明显,但不会超过cpu核心数

LongAdder原理

AtomicLong和LongAdder的临界区数据对比图
原子累加器 :LongAdder
LongAdder的基本思路就是*分散热点*,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。–获取最终结果通过 sum 方法,将各个累加单元的值加起来就得到了总的结果

LongAdder 类有几个关键域

// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;
// 在 cells 创建或扩容时, 置为 1, 表示加锁,保护cell[]数组的安全,创建扩容
transient volatile int cellsBusy; 

cell累加单元

其父类Striped64 定义了一个内部Cell类,这就是我们之前所说的槽,每个Cell对象存有一个value值,可以通过Unsafe来CAS操作它的值:
原子累加器 :LongAdder

@jdk.internal.vm.annotation.Contended

缓存行伪共享注解

cpu需要把内存的数据读到缓存提升效率
原子累加器 :LongAdder
CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。缓存离 cpu 越近速度越快。 而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long),缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的同一缓存行中,CPU 要保证数据的一致性要确保每个缓存中使用的共享变量的副本是一致的,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效(缓存一致性协议(MESI协议:当某个CPU在写数据时,如果发现操作的变量是共享变量,则会通知其他CPU告知该变量的缓存行是无效的,因此其他CPU在读取该变量时,发现其无效会重新从主存中加载数据。);一旦对应缓存行失效cpu就会从内存中读取最新的值,这样带来了效率的降低;为了解决这样问题使用缓存行伪共享注解在使用此注解的对象或字段的前后各增加 128 字节大小的padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效

原子累加器 :LongAdder
使用缓存行伪共享注解在使用此注解的对象或字段的前后各增加 128 字节大小的padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效

伪共享:假设一个类的两个相互独立的属性a和b在内存地址上是连续的(比如FIFO队列的头尾指针),那么它们通常会被加载到相同的cpu cache line里面。并发情况下,如果一个线程修改了a,会导致整个cache line失效(包括b),这时另一个线程来读b,就需要从内存里再次加载了,这种多线程频繁修改ab的情况下,虽然a和b看似独立,但它们会互相干扰,非常影响性能

add

public void add(long x) {
        // cs 为累加单元数组, b 为基础值, x 为累加值
        Cell[] cs; long b, v; int m; Cell c;
        // 进入 if 的两个条件
        // 1. cs 有值, 表示已经发生过竞争, 进入 if
        // 2. cas 给 base 基础累加时失败了, 表示 base 发生了竞争, 进入 if
        // 3. 如果 cs 没有创建, 然后 cas 累加成功就返回,累加到 base 中 不存在线程竞争的时候用到。
        if ((cs = cells) != null || !casBase(b = base, b + x)) {
          
            int index = getProbe();
            boolean uncontended = true;
              // uncontended 表示 cell 是否有竞争,这里赋值为 true 表示有竞争
            if (
                // cs 还没有创建
                    cs == null || (m = cs.length - 1) < 0 ||
                            // 当前线程对应的 cell 还没有被创建,a为当线程的cell
                            (c = cs[index & m]) == null ||
       // 给当前线程的 cell 累加失败 uncontended=false ( c为当前线程的 cell )
                            !(uncontended = c.cas(v = c.value, v + x))
            ) {
                // 当 cells 为空时,累加操作失败会调用方法,
                // 当 cells 不为空,当前线程的 cell 创建了但是累加失败了会调用方法,
                // 当 cells 不为空,当前线程 cell 没创建会调用这个方法
                // 进入 cell 数组创建、cell 创建的流程
                longAccumulate(x, null, uncontended,index);
            }
        }
    }
 

原子累加器 :LongAdder

longAccumulate() :无限循环对cells数组进行操作更新。如果对应的cell为空则cas创建cell并插入,如果不为空则cas修改其value值。如果cas修改失败则扩容,但是扩容最大值是CPU核数。

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended,int index) {
        
        // 当前线程还没有对应的 cell
        if ((index == 0) {
             
            ThreadLocalRandom.current();
            // 需要最新的随机值用来将当前线程绑定到 cell 对应新的 probe 值 
             index = getProbe();
            wasUncontended = true;
        }
        // collide 为 true 表示需要扩容
        
        for (boolean collide = false;;) {
            Cell[] cs; Cell c; int n; long v;
            // 已经有了 cells说明另外一个线程进来了
            if ((cs = cells) != null && (n = cs.length) > 0) {
                // 但是还没有当前线程对应的 cell累加单元,为null说明该线程还没有创建累加单元
                if ((c = cs[(n - 1) & index]) == null) {
                    // 为 cellsBusy 加锁, 创建 cell, cell 的初始累加值为 x
                    // 成功则 break, 否则继续 continue 循环
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {//没有上锁且cas 上锁成功 
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                //这里使用double check 减少cas的竞争,很多源码底层都使用此种思想
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    // 判断槽位确实是空的
                                    rs[j = (m - 1) & index] == null) {
                                    rs[j] = r;
                                    break;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                             
                            continue;           // Slot is now non-empty
                        }
                }
                // 有竞争, 改变线程对应的 cell 来重试 cas
                else if (!wasUncontended)
                    wasUncontended = true;
                    // cas 尝试累加, fn 配合 LongAccumulator 不为 null, 配合 LongAdder 为 null
                else if (c.cas(v = c.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
                    break;
                    // 如果 cells 长度已经超过了最大长度, 或者已经扩容, 改变线程对应的 cell 来重试 cas
                else if (n >= NCPU || cells != cs)
                    collide = false;
                    // 确保 collide 为 false 进入此分支, 就不会进入下面的 else if 进行扩容了
                else if (!collide)
                    collide = true;
                    // 加锁
                else if (cellsBusy == 0 && casCellsBusy()) {
                    // 加锁成功, 扩容及
                     try {
                        if (cells == cs)        // 确保cs 的引用没有更改
                            cells = Arrays.copyOf(cs, n << 1);
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                       
                    continue;
                }
                // 改变线程对应的 cell
                index = advanceProbe(index);
            }
            // 还没有 cells, cells==as是指没有其它线程修改cells,as和cells引用相同的对象,cellsBusy=0说明没有线程持有锁             // 使用casCellsBusy()尝试给 cellsBusy 加锁
            else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
                // 加锁成功, 初始化 cells, 最开始长度为 2, 并填充一个 cell
                // 成功则 break
                try {                           // Initialize table
                    if (cells == cs) {
                        Cell[] rs = new Cell[2];
                        rs[index & 1] = new Cell(x);
                        cells = rs;
                        break;
                    }
                } finally {
                    cellsBusy = 0;//使用完释放锁标志位置为true
                }
                 
            }
            // 上两种情况失败, 尝试给 base 使用casBase累加
            else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
                break;
        }
    }

可以看到,只有从未出现过并发冲突的时候,base基数才会使用到,一旦出现了并发冲突,之后所有的操作都只针对Cell[]数组中的单元Cell。
如果Cell[]数组未初始化,会调用父类的longAccumelate去初始化Cell[],如果Cell[]已经初始化但是冲突发生在Cell单元内,则也调用父类的longAccumelate,此时可能就需要对Cell[]扩容了

上一篇:Epplus c# to excel 的 入门学习(一)


下一篇:细胞专刊 | PBMC/人骨髓单个核细胞