LongAdder与高并发场景下的qps计算

高并发场景下的qps计算

qps通常可以定义为:对一秒内该进程中的各个线程对某一个切入点的次数进行累加计数。

业务场景

公司层面已经有一套针对服务的SLA监控。但我负责的这个服务是一套地理位置服务,属于公共支撑类服务,因此该服务要对接多个业务方。为了知道上线后,各个业务方的qps以及天级流量,我们就需要孵化出一个工具对这两个metric行统计。

术语定义

  • bucket:桶,描述单位时间内的请求数量
  • bucketWindow:时间窗口,描述需要统计时间窗口的数据,一个时间窗口包含多个桶。例如:需要统计qps,那么统计的时间窗口为1000ms

核心设计

LongAdder与高并发场景下的qps计算

public class Bucket {

    private final LongAdder count;

    private final LongAdder rt;

    /**
     * 最小耗时
     */
    private volatile long minRt = Integer.MAX_VALUE;

    /**
     * 最大耗时
     */
    private volatile long maxRt = Integer.MIN_VALUE;

    public Bucket() {
        this.count = new LongAdder();
        this.rt = new LongAdder();
    }

    public void add(long n) {
        count.add(n);
    }

    public long sum() {
        return count.sum();
    }

    public long sumRt() {
        return rt.sum();
    }

    public void reset() {
        count.reset();
    }

    public void addRt(long currentRt) {
        if (currentRt < minRt) {
            minRt = currentRt;
        }
        if (currentRt > maxRt) {
            maxRt = currentRt;
        }
        rt.add(currentRt);
    }

    public long sumThenReset() {
        return count.sumThenReset();
    }

    public long getMinRt() {
        return minRt;
    }

    public long getMaxRt() {
        return maxRt;
    }
}
public class BucketWrap<T> {

    /**
     * 每个bucket存储桶的时间长度(毫秒)
     */
    private final long bucketLengthInMs;

    /**
     * bucket的开始时间戳(毫秒)
     */
    private long bucketStartTime;

    private T bucket;

    public BucketWrap(long bucketLengthInMs, long bucketStartTime, T bucket) {
        this.bucketLengthInMs = bucketLengthInMs;
        this.bucketStartTime = bucketStartTime;
        this.bucket = bucket;
    }

    public long getBucketStartTime() {
        return bucketStartTime;
    }

    public long getBucketLengthInMs() {
        return bucketLengthInMs;
    }

    public T getBucket() {
        return bucket;
    }

    /**
     * 把bucket开始时间设置为startTime
     *
     * @param startTime
     * @return
     */
    public BucketWrap<T> resetWindowStart(long startTime) {
        this.bucketStartTime = startTime;
        return this;
    }

    /**
     * time是否在当前bucket中
     *
     * @param time
     * @return
     */
    public boolean timeInWindow(long time) {
        return bucketStartTime <= time && time < bucketStartTime + bucketLengthInMs;
    }
}
public class BucketsWindow {

    /**
     * bucket时间长度
     */
    private long bucketLengthInMs;

    /**
     * 样本数量
     */
    private int sampleCount;

    /**
     * 窗口时间
     */
    private long windowIntervalInMs;

    /**
     * 数据
     */
    protected final AtomicReferenceArray<BucketWrap<Bucket>> atomicReferenceArray;

    /**
     * 更新锁
     */
    private final ReentrantLock updateLock = new ReentrantLock();

    public BucketsWindow(int sampleCount, long windowIntervalInMs) {
        this.sampleCount = sampleCount;
        this.windowIntervalInMs = windowIntervalInMs;
        this.bucketLengthInMs = windowIntervalInMs / sampleCount;
        this.atomicReferenceArray = new AtomicReferenceArray<>(sampleCount);
    }

    /**
     * 废弃的bucket
     *
     * @param currentTime
     * @param bucketWrap
     * @return
     */
    private boolean discardedBucket(long currentTime, BucketWrap<Bucket> bucketWrap) {
        return currentTime - bucketWrap.getBucketStartTime() > windowIntervalInMs;
    }

    /**
     * 获取当前时间的窗口
     *
     * @return
     */
    public BucketWrap<Bucket> getCurrentBucket() {
        return getCurrentBucket(System.currentTimeMillis());
    }

    public List<Bucket> getBuckets() {
        return getBuckets(System.currentTimeMillis());
    }

    private List<Bucket> getBuckets(long currentTime) {
        if (currentTime < 0) {
            throw new IllegalArgumentException(String.format("current time is illegal,currentTime:%d", currentTime));
        }
        List<Bucket> buckets = Lists.newArrayList();
        int length = atomicReferenceArray.length();
        for (int i = 0; i < length; i++) {
            BucketWrap<Bucket> bucketWrap = atomicReferenceArray.get(i);
            if (null == bucketWrap || discardedBucket(currentTime, bucketWrap)) {
                continue;
            }
            buckets.add(bucketWrap.getBucket());
        }
        return buckets;
    }

    /**
     * 计算窗口下标
     *
     * @param currentTime
     * @return
     */
    private int calculateWindowIndex(long currentTime) {
        return (int) ((currentTime / bucketLengthInMs) % sampleCount);
    }

    /**
     * 计算窗口的开始时间戳
     *
     * @param currentTime
     * @return
     */
    private long calculateWindowStartTime(long currentTime) {
        return currentTime - currentTime % bucketLengthInMs;
    }

    /**
     * 重置窗口
     *
     * @param oldBucketWrap
     * @param startTime
     * @return
     */
    private BucketWrap<Bucket> resetWindow(BucketWrap<Bucket> oldBucketWrap, long startTime) {
        oldBucketWrap.resetWindowStart(startTime);
        oldBucketWrap.getBucket().reset();
        return oldBucketWrap;
    }

    /**
     * 获取当前窗口
     *
     * @param currentTime
     * @return
     */
    private BucketWrap<Bucket> getCurrentBucket(long currentTime) {
        if (currentTime < 0) {
            throw new IllegalArgumentException(String.format("current time is illegal,currentTime:%d", currentTime));
        }
        int windowIndex = calculateWindowIndex(currentTime);
        long bucketStartTime = calculateWindowStartTime(currentTime);

        for (; ; ) {
            BucketWrap<Bucket> bucketWrap = atomicReferenceArray.get(windowIndex);
            if (null == bucketWrap) {
                BucketWrap<Bucket> tempWindowWrap = new BucketWrap<>(bucketLengthInMs,
                        bucketStartTime, new Bucket());
                // cas设置window
                if (atomicReferenceArray.compareAndSet(windowIndex, null, tempWindowWrap)) {
                    return tempWindowWrap;
                }
            } else if (bucketStartTime == bucketWrap.getBucketStartTime()) {
                return bucketWrap;
            } else if (bucketStartTime > bucketWrap.getBucketStartTime()) {// 复用旧的bucket
                if (updateLock.tryLock()) {
                    try {
                        return resetWindow(bucketWrap, bucketStartTime);
                    } finally {
                        updateLock.unlock();
                    }
                }
            } else {
                return new BucketWrap<>(bucketLengthInMs, bucketStartTime, new Bucket());
            }
        }
    }

    public long getBucketLengthInMs() {
        return bucketLengthInMs;
    }

    public int getSampleCount() {
        return sampleCount;
    }

    public long getWindowIntervalInMs() {
        return windowIntervalInMs;
    }
}
public interface Flow {

    /**
     * 增加
     *
     * @param rt 耗时
     */
    void increaseRT(long rt);

    /**
     * 增加
     *
     * @param count 数量
     */
    void increase(long count);

    /**
     * 增加
     */
    void increase();

    /**
     * 总请求数量
     *
     * @return 总请求数量
     */
    long totalCount();

    /**
     * 平均耗时
     *
     * @return 平局耗时
     */
    long averageRt();

    /**
     * 最小耗时
     *
     * @return 最小耗时
     */
    long minRt();

    /**
     * 最大耗时
     *
     * @return 最大耗时
     */
    long maxRt();

    /**
     * 获取所有滑动窗口
     *
     * @return
     */
    List<Bucket> buckets();
}
public class SecondFlow extends BaseFlow {

    public SecondFlow() {
        // bucket为10ms
        super(new BucketsWindow(100, 1000));
    }
}

public class MinuteFlow extends BaseFlow {

    public MinuteFlow() {
        // bucket为100ms
        super(new BucketsWindow(600, 60 * 1000));
    }
}

public class HourFlow extends BaseFlow {

    public HourFlow() {
        // bucket为1秒
        super(new BucketsWindow(3600, 60 * 60 * 1000));
    }
}
class SecondFlowTest {

    public static void main(String[] args) {
        SecondFlow secondFlow = new SecondFlow();
        new Thread(() -> {
            while (!Thread.interrupted()) {
                try {
                    secondFlow.increase(1);
                    Thread.currentThread().sleep(2);

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(() -> {
            while (!Thread.interrupted()) {
                try {
                    Thread.currentThread().sleep(1200);
                    System.out.println(secondFlow.totalCount());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

LongAdder

在jdk 1.8中,java.util.concurrent.atomic包中提供了一个新的原子类:LongAdder。LongAdder在高并发的场景下,比AtomicLong具有更好的性能,但会消耗更多的内存空间。

AtomicLong的瓶颈

AtomicLong利用了底层的CAS来提供并发场景下的原子操作的:

/**
 * Atomically increments by one the current value.
 *
 * @return the updated value
 */
public final long incrementAndGet() {
    return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}

上面的incrementAndGet吊用了底层的getAndAddLong方法,getAndAddLong方法内部调用的是native方法,采用自旋的方式不断地更新目标值,直至更新成功。
若在高并发环境下,N个线程同时进行自旋操作,会出现大量的自旋失败并不断自旋的情况。此时,自旋便成了AtomicLong的瓶颈。
因此,jdk1.8便在java.util.concurrent.atomic包中提供了一个新的原子类:LongAdder。它在高并发的场景下会比AtomicLong更好的性能,代价是消耗更多的内存、不精确的计数。

LongAdder使用场景

  • LongAdder适合使用在对计数的精确度要求不高,在高并发下有较好的性能表现侧场景
  • AtomicLong适合使用在计数无误差,性能要求不高的场景

LongAdder与AtomicLong的性能比较

Java 8 Performance Improvements: LongAdder vs AtomicLong

LongAdder原理

在高并发环境下,AtomicLong是多个线程对一个热点值进行的cas操作。单热点的cas操作必然会造成cas的大量失败。
Doug Lea基于此缺点,创造出了LongAdder。LongAdder的基本思路就是分散热点值,将热点值分散到一个数组中。这个数组的每个元素,我们可以称它为槽。在高并发场景中,不同的线程会命中到不同的槽中,各个线程只会对自己槽中的值进行cas槽操作。那么,热点值就被分散了,cas冲突的概率也大大地减少了。
接下来,我们来看看LongAdder的api以及内部结构、核心方法。

LongAdder Api

Modifier and Type Method Description
void add(long x) Adds the given value.
void decrement() Equivalent to add(-1).
double doubleValue() Returns the sum() as a double after a widening primitive conversion.
float floatValue() Returns the sum() as a float after a widening primitive conversion.
void increment() Equivalent to add(1).
int intValue() Returns the sum() as an int after a narrowing primitive conversion.
long longValue() Equivalent to sum().
void reset() Resets variables maintaining the sum to zero.
long sum() Returns the current sum.
long sumThenReset() Equivalent in effect to sum() followed by reset().
String toString() Returns the String representation of the sum().

从上述api来看,LongAdder提供了增减、获取值、重置等原子操作。

内部结构

LongAdder与高并发场景下的qps计算
从上图中,我们可以看出LongAdder继承了Striped64。
Striped64是实现LongAdder的核心class,其定义了内部结构:


/**
 * Table of cells. When non-null, size is a power of 2.
 */
transient volatile Cell[] cells;

/**
 * Base value, used mainly when there is no contention, but also as
 * a fallback during table initialization races. Updated via CAS.
 */
transient volatile long base;

cells:我们上面提到的数组,用来存储被打散的热点值。cells中的每个元素被成为槽。
base:基数。会在以下情况下使用:当未遇到并发竞争单一热点值的情况下,直接使用base值进行累加;当遇到多线程竞争单一热点时,需要初始化cell数组,以达到打散热点值的目的。但cell数组只能被初始化一次。因此,其他竞争失败的线程也会将值累加到base上。
接下来,我们来看看Cell的结构:

@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

Cell也被称为槽,它是AtomicLong的变体,仅支持cas方式的更新值。其value字段用来存储计数的值;cas方法用来实现原子更新。

经过上面的原理以及字段介绍,我们能猜出LongAdder最终的值的计算方式为:

value = base + \sum_{i=0}^n Cell[i]

例如:有5个线程thread-0、thread-1、thread-2、thread-3,thread-4,thread-0在第一秒调用了LongAdder的add(5);thread-1、thread-2同时在第二秒调用了LongAdder的add方法,分别加了10、20;thread-3,thread-4同时在第三秒调用了LongAdder的add方法,分别加了30、40。
对于LongAdder,最终的值是这样计算出来的:

  • thread-0调用add(5),由于没有并发,base=5
  • thread-1、thread-2同时调用。由于需要初始化cell数组,此时thread-2的值也会加到base中,base=10+5=15;cell[x2] = 20;
  • thread-3,thread-4也会被打散到cell数组中,cell[x3]=30,cell[x4]=40
    因此,value= (10 + 5) + (20 + 30 + 40) = 105

核心方法

add

/**
 * Adds the given value.
 *
 * @param x the value to add
 */
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

/**
 * CASes the base field.
 */
final boolean casBase(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

可以看出,若多个线程线性执行,那么caseBase方法不会失败,值直接累加到base中即可。
若出现多线程竞态的情况下,caseBase方法将会失败,代码将进入if方法块;然后会判断Cell[]是否初始化过,若Cell[]初始化了,后续所有的cas操作都只会针对cell,不再累加到base;若未初始化,代码将进入longAccumulate方法;
更详细的描述可以见以下流程图:
LongAdder与高并发场景下的qps计算

接下来,我们来看看longAccumulate做了啥。

longAccumulate

上述可知,longAccumulate主要做两件事:cell[]未初始化时,对其进行初始化,并根据当前线程的hash值计算出index,并在cell[index]中创建对应的槽;cell已经初始化的情况下,当前线程cas更新失败,则出现冲突,进行扩容并rehash到指定槽位。

/**
 * Handles cases of updates involving initialization, resizing,
 * creating new Cells, and/or contention. See above for
 * explanation. This method suffers the usual non-modularity
 * problems of optimistic retry code, relying on rechecked sets of
 * reads.
 *
 * @param x the value
 * @param fn the update function, or null for add (this convention
 * avoids the need for an extra field or function in LongAdder).
 * @param wasUncontended false if CAS failed before call
 */
final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {// CASE 1
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {//Case 2
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x)))) // Case 3
            break;                          // Fall back on using base
    }
}

上述代码先给当前线程分配一个has值,然后进入自旋,自旋分3个分支:

  1. Case 1:cell[]已经初始化
  • 当前线程所在的槽,就占有占有cellsBusy锁并在索引位置创建一个槽
  • CAS尝试更新当前线程所在的槽,如果成功就返回,如果失败说明出现冲突
  • 当前线程更新槽失败后并不是立即扩容,而是尝试更新probe值后再重试一次
  • 如果在重试的时候还是更新失败,就扩容;扩容时当前线程占有cellsBusy锁,并把数组容量扩大到原来的两倍,再copy原cell[]中元素到新数组中,原cell[]索引位置不变。使用新扩容的cell重新尝试更新
  • 在longAccumulate()方法中有个条件是n >= NCPU就不会走到扩容逻辑了,而n是2的倍数,那是不是代表cell[]的长度最大只能达到NCPU?
    同一个CPU核心同时只会运行一个线程,而更新失败了说明有两个不同的核心更新了同一个Cell,这时会重新设置更新失败的那个线程的probe值,这样下一次它所在的Cell很大概率会发生改变。如果运行的时间足够长,最终会出现同一个核心的所有线程都会hash到同一个Cell(大概率,但不一定全在一个Cell上)上去更新,所以,这里cells数组中长度并不需要太长,达到CPU核心数足够了。
    例如,电脑是8核的,所以这里cell[]最大只会到8,达到8就不会扩容了。
  1. Case 2:cell[]未初始化
    当前线程会尝试占有cellsBusy锁并创建cell[],根据当前线程的hash值计算映射的索引,创建对应的槽,累加本次对应的值
  2. Case 3:cell[]正在初始化中
    当前线程尝试创建cells数组时,发现有其它线程已经在创建了,就尝试更新base。

参考

https://segmentfault.com/a/1190000015865714
https://www.cnblogs.com/tong-yuan/p/LongAdder.html

上一篇:LongAdder


下一篇:比AtomicInteger更优的多线程计数类:LongAdder原理分析