并发编程--计数器不同实现方案性能对比【synchronized、LongAdder、LongAccumulator、AtomicLong】

在并发编程时,经常需要进行计数,如统计处理的记录条数、成功处理的条数、失败处理的条数等,本文针对synchronized、LongAdder、LongAccumulator、AtomicLong四种方案实现的计数器进行性能对比,并给出使用建议。

1. 计数器实现

1.1. 实现方案及测试结果

分别使用synchronized、AtomicLong、LongAdder、LongAccumulator方案,实现计数器,实现代码参见下面的代码。

性能测试结果详见下面的输出结果,LongAdder和LongAccumulator性能相当,synchronized性能最差,AtomicLong的耗时约为synchronized一半,LongAdder和LongAccumulator的耗时约为前两个的几十分之一。

1.2. 结果分析

AtomicLong是JDK1.5开始出现的,里面主要使用了一个long类型的value作为成员变量,然后使用循环的CAS操作去操作value的值,并发量比较大的情况下,CAS操作失败的概率较高,内部失败了会重试,导致耗时可能会增加。

LongAdder是JDK1.8开始出现的,所提供的API基本上可以替换掉原先的AtomicLong。LongAdder在并发量比较大的情况下,操作数据的时候,相当于把这个数字分成了很多份数字,然后交给多个人去管控,每个管控者负责保证部分数字在多线程情况下操作的正确性。当多线程访问的时,通过hash算法映射到具体管控者去操作数据,最后再汇总所有的管控者的数据,得到最终结果。相当于降低了并发情况下锁的粒度,所以效率比较高,看一下下面的图,方便理解:

并发编程--计数器不同实现方案性能对比【synchronized、LongAdder、LongAccumulator、AtomicLong】

LongAccumulator是LongAdder的功能增强版。LongAdder的API只有对数值的加减,而LongAccumulator提供了自定义的函数操作,其构造函数如下:

    /**
     * Creates a new instance using the given accumulator function
     * and identity element.
     * @param accumulatorFunction a side-effect-free function of two arguments
     * @param identity identity (initial value) for the accumulator function
     */
    public LongAccumulator(LongBinaryOperator accumulatorFunction,
                           long identity) {
        this.function = accumulatorFunction;
        base = this.identity = identity;
    }

LongAdder、LongAccumulator全面超越同步锁及AtomicLong的方式,建议在使用AtomicLong的地方可以直接替换为LongAdder、LongAccumulator,吞吐量更高一些

2. 计数器不同的实现方案

模拟50个线程,每个线程对计数器递增100万次,最终结果应该是5000万。

分别使用synchronized、LongAdder、LongAccumulator、AtomicLong四种实现方案,并进行性能测试。

2.1 实现代码

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;

public class CounterTest {
    private static ICounter counter;

    public static void main(String[] args) throws Exception {
        for (int j = 1; j < 5; j++) {
            counter = CounterFactory.getCounter(j);
            System.out.println(String.format("基于%s进行计数", counter.getClass().getSimpleName()));
            for (int i = 0; i < 10; i++) {
                counter.reset();
                count(counter);
            }
        }
    }

    private static void count(ICounter counter) throws InterruptedException {
        long t1 = System.currentTimeMillis();
        int threadCount = 50;
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                try {
                    for (int j = 0; j < 1000000; j++) {
                        counter.incr();
                    }
                } finally {
                    countDownLatch.countDown();
                }
            }).start();
        }
        countDownLatch.await();
        long t2 = System.currentTimeMillis();
        System.out.println(String.format("结果:%s,耗时(ms):%s", counter.get(), (t2 - t1)));
    }
}

interface ICounter {
    public void reset();

    public void incr();

    public long get();
}

class SynchronizedCounter implements ICounter {
    private Integer count = 0;
    private Object lock = new Object();

    @Override
    public void reset() {
        count = 0;
    }

    @Override
    public void incr() {
        synchronized (lock) {
            count++;
        }
    }

    @Override
    public long get() {
        return count;
    }
}

class AtomicLongCounter implements ICounter {
    private AtomicLong count = new AtomicLong(0);

    @Override
    public void reset() {
        count.set(0);
    }

    @Override
    public void incr() {
        count.incrementAndGet();
    }

    @Override
    public long get() {
        return count.get();
    }
}

class LongAdderCounter implements ICounter {
    private LongAdder count = new LongAdder();

    @Override
    public void reset() {
        count.reset();
    }

    @Override
    public void incr() {
        count.increment();
    }

    @Override
    public long get() {
        return count.sum();
    }
}

class LongAccumulatorCounter implements ICounter {
    private LongAccumulator count = new LongAccumulator((x, y) -> x + y, 0L);

    @Override
    public void reset() {
        count.reset();
    }

    @Override
    public void incr() {
        count.accumulate(1);
    }

    @Override
    public long get() {
        return count.longValue();
    }
}

class CounterFactory {
    public static ICounter getCounter(int counterType) throws Exception {
        switch (counterType) {
            case 1:
                return new SynchronizedCounter();
            case 2:
                return new AtomicLongCounter();
            case 3:
                return new LongAdderCounter();
            case 4:
                return new LongAccumulatorCounter();
        }
        throw new Exception("参数错误");
    }
}

2.2. 输出结果


基于SynchronizedCounter进行计数
结果:50000000,耗时(ms):1451
结果:50000000,耗时(ms):1452
结果:50000000,耗时(ms):1397
结果:50000000,耗时(ms):1406
结果:50000000,耗时(ms):1408
结果:50000000,耗时(ms):1411
结果:50000000,耗时(ms):1422
结果:50000000,耗时(ms):1415
结果:50000000,耗时(ms):1434
结果:50000000,耗时(ms):1430
基于AtomicLongCounter进行计数
结果:50000000,耗时(ms):777
结果:50000000,耗时(ms):752
结果:50000000,耗时(ms):756
结果:50000000,耗时(ms):768
结果:50000000,耗时(ms):757
结果:50000000,耗时(ms):761
结果:50000000,耗时(ms):769
结果:50000000,耗时(ms):784
结果:50000000,耗时(ms):776
结果:50000000,耗时(ms):772
基于LongAdderCounter进行计数
结果:50000000,耗时(ms):60
结果:50000000,耗时(ms):53
结果:50000000,耗时(ms):54
结果:50000000,耗时(ms):52
结果:50000000,耗时(ms):55
结果:50000000,耗时(ms):51
结果:50000000,耗时(ms):53
结果:50000000,耗时(ms):55
结果:50000000,耗时(ms):51
结果:50000000,耗时(ms):51
基于LongAccumulatorCounter进行计数
结果:50000000,耗时(ms):55
结果:50000000,耗时(ms):51
结果:50000000,耗时(ms):52
结果:50000000,耗时(ms):51
结果:50000000,耗时(ms):52
结果:50000000,耗时(ms):52
结果:50000000,耗时(ms):51
结果:50000000,耗时(ms):51
结果:50000000,耗时(ms):51
结果:50000000,耗时(ms):50

上一篇:Caught exception [Exec exit status not zero. Status [1]] Sleeping for [10,000]ms before trying again


下一篇:路由双向引入引发的环路与次优路径及解决方案-pro