Java8原子弹类之LongAdder源码分析
JDK 8开始,针对Long型的原子操作, Java又提供了LongAdder. LongAccumulator; 针对Double类型,Java提供了DoubleAdder、DoubleAccumulator。
Striped64 UML
AtomicLong内部是一个volatile long型变量,由多个线程对这个变量进行CAS操作。多个线程同时对一个变量进行CAS操作,在高并发场景下仍不够快,若再提高性能,咋办?
把一个变量拆成多份,变为多个变量,类似ConcurrentHashMap分段锁。如下图,把一个Long型拆成一个base变量外加多个Cell,每个Cell包装一个Long型变量。当多个线程并发累加的时:
- 如果并发度低,就直接加到base变量上
- 并发度高,冲突大,平摊到这些Cell上
最后取值时,再把base和这些Cell求sum运算。
核心 API
add
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
// 判断cells是否还没被初始化,并且尝试对value值进行cas操作
if ((as = cells) != null || !casBase(b = base, b + x)) {
// 如果cells已经初始化 或 cas操作失败,则运行if内部语句
boolean uncontended = true;
// cell[]数组是否初始化
// cell[]数组虽然初始化了但是数组长度是否为0
// 该线程所对应cell是否为null
// 尝试对该线程对应的cell单元进行cas更新是否失败,如果这些条件有一条为true,则运行最为核心的方法longAccumulate
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
包含一个Cell数组,Striped64
的一个内部类。即AtomicLong
的填充变体且只支持原始访问和CAS,有一个value变量,并且提供cas方法更新value值。
/**
* 处理涉及初始化,调整大小,创建新Cell,和/或争用的更新案例
*
* @param x 值
* @param fn 更新方法
* @param wasUncontended 调用
*/
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h;
// 获取线程probe的值
if ((h = getProbe()) == 0) {
// 值为0,则强制初始化
ThreadLocalRandom.current();
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempt
for (;;) {
Cell[] as; Cell a; int n; long v;
// 这个if分支处理上述四个条件中的前两个
// 此时cells数组已初始化 && 长度大于0
if ((as = cells) != null && (n = as.length) > 0) {
// 线程对应cell为null
if ((a = as[(n - 1) & h]) == null) {
// 若busy锁未被占有
if (cellsBusy == 0) { // Try to attach new Cell
// 新建一个cell
Cell r = new Cell(x); // Optimistically create
// 检测busy是否为0,并且尝试锁busy
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
// 再次确认线程probe所对应的cell为null,将新建的cell赋值
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
// 解锁
cellsBusy = 0;
}
if (created)
break;
//如果失败,再次尝试
continue; // Slot is now non-empty
}
}
collide = false;
}
//置为true后交给循环重试
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//尝试给线程对应的cell update
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
//在以上条件都无法解决的情况下尝试扩展cell
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
//此时cells还未进行第一次初始化,进行初始化
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//busy锁不成功或者忙,则再重试一次casBase对value直接累加
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
* 通过cas实现的自旋锁,用于扩大或者初始化cells
*/
transient volatile int cellsBusy;
从以上分析来看,longAccumulate
就是为了尽量减少多个线程更新同一个value,实在不行则扩大cell
SUM(最终一致性问题)
并没有对 cells 数组加锁,所以是最终一致性,而非强一致性。类似 concurrenthashmap#clear(一边执行清空操作,一边还有线程放入数据,clear调用完完毕后再读取)。因此,适合高并发的统计场景,而不适合要对某个Long型变量进行严格同步的场景。
/**
* Returns the current sum. 返回值不是个原子快照;无并发修改的调用可以返回精确值,但当计算sum时有并发修改,就可能无法正常协作了。
*/
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
LongAdder
减少冲突的方法以及在求和场景下比AtomicLong
更高效。
因为LongAdder
在更新数值时并非对一个数进行更新,而是分散到多个cell,这样在多线程的情况下可以有效的嫌少冲突和压力,使得更加高效。
由于无论是long or double,都是64位。但因没有double型的CAS操作,所以是通过把double型转化
成long型来实现。所以,上面的base和Cell[]变量,位于基类Striped64。英文Striped意为“条带”, 即分片。
使用场景
适用于统计求和计数的场景,因为它提供add
、sum
方法。
LongAdder是否能够替换AtomicLong
不行,因为AtomicLong
提供了很多cas方法,例如getAndIncrement
、getAndDecrement
等,使用起来非常灵活,而LongAdder
只有add
和sum
,使用较受限。
优点:由于 JVM 会将 64位的double,long 型变量的读操作分为两次32位的读操作,所以低并发保持了 AtomicLong性能,高并发下热点数据被 hash 到多个 Cell,有限分离,通过分散提升了并行度
但统计时有数据更新,也可能会出现数据误差,但高并发场景有限使用此类,低时还是可以继续 AtomicLong
伪共享与缓存行填充
Cell类定义用了注解 Contended,JDK 8新增,涉及伪共享与缓存行填充。
每个CPU都有自己的缓存。缓存与主内存进行数据交换的基本单位叫Cache Line (缓存行)。在64位x86架
构中,缓存行是64字节,也就是8个Long型的大小。这也意味着当缓存失效,要刷新到主内存的时候,最少要刷新64字节。
如下,主内存中有变量X、Y、 Z (假设每个变量都是一个Long型) ,被CPU1和CPU2分别读入自己的缓
存,放在了同一行Cache Line里面。当CPU1修改了X变量,它要失效整行Cache Line,即往总线上发消息,通知CPU2对应Cache Line也失效。由于Cache Line是数据交换的基本单位,无法只失效X,要失效就会失效整行的Cache Line,这会导致Y、Z变量的缓存也失效。
使用Contended注解,即可实现缓存行填充。不让Cell数组中相邻的元素落到同一个缓存行。