@
目录在 atomic 包中,有三个 Reference 相关的类:AtomicReference,AtomicStampedReference,AtomicMarkableReference。
AtomicReference 无法解决 ABA 问题,而后面的两个类特别是 AtomicStampedReference 能够很好地解决 ABA 问题。
ABA问题可以见我前面的文章 unsafe 介绍(二)与CAS
1 Stamped 和 Markable的比较
AtomicStampedReference 和 AtomicMarkableReference 都能处理 ABA 问题,区别如下:
AtomicStampedReference 内部的 Pair 类,除了包含数据的 reference,还有一个 int 型的 stamp。
AtomicMarkableReference 结构类似,除了 reference 还有一个 boolean 的 mark。
AtomicStampedReference 的版本最多和不同的 Integer 个数一样多,即 \(2^{32}\),而 AtomicMarkableReference 的版本只有两种,很容易重复。
以下只分析 AtomicStampedReference。
public class AtomicStampedReference<V> {
private static class Pair<T> {
final T reference;
// 使用 stamp 标记
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
...
}
public class AtomicMarkableReference<V> {
private static class Pair<T> {
final T reference;
// 使用 mark 标记
final boolean mark;
private Pair(T reference, boolean mark) {
this.reference = reference;
this.mark = mark;
}
static <T> Pair<T> of(T reference, boolean mark) {
return new Pair<T>(reference, mark);
}
}
private volatile Pair<V> pair;
}
2 介绍
正如前面所说,AtomicStampedReference 内部持有一个 Pair,而 Pair 其实包含数据 reference 和版本 stamp 的。
注意到 reference 和版本 stamp 都是 final 的,保证不可变。要修改只能调用静态的 of 生成一个新的 Pair,这保证了安全。
下面的 get 方法只返回 reference,stamp 会放在 stampHolder[0] 中。
// 初始化
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
}
/**
* Returns the current value of the reference.
*
* @return the current value of the reference
*/
public V getReference() {
return pair.reference;
}
/**
* Returns the current value of the stamp.
*
* @return the current value of the stamp
*/
public int getStamp() {
return pair.stamp;
}
/**
* Returns the current values of both the reference and the stamp.
* Typical usage is {@code int[1] holder; ref = v.get(holder); }.
*
* @param stampHolder an array of size of at least one. On return,
* {@code stampholder[0]} will hold the value of the stamp.
* @return the current value of the reference
*/
// 将 reference 返回, stamp 放入 stampHolder[0]
public V get(int[] stampHolder) {
Pair<V> pair = this.pair;
stampHolder[0] = pair.stamp;
return pair.reference;
}
3 其他方法
下面的方法都调用 Unsafe,前文讲过,不再描述。
重点是 compareAndSet 和 attemptStamp,return 后面非常不好理解。
涉及的知识点是 && 的短路机制。如果有三个表达式 a,b,c,把它们组合成一个新的表达式 a && b && c,短路机制如下:
如果 a 为 false,结果为 false,不处理 b 和 c
如果 a 为 true,继续处理
如果 b 为 false,结果为 false,不处理 c
如果 b 为 true ,继续处理
如果 c 为 false,返回 false
如果 c 为 true ,返回 true
以 compareAndSet 为例,如果期望的 expectedReference 满足条件,才会判断期望的 expectedStamp;如果期望的 expectedStamp 满足条件,才执行第三个括号;如果没有执行到第三个括号就结束,说明期望值不管是 reference 还是 stamp 至少一个不满足条件,会返回 false。
第三个括号首先判断当前值是不是已经是新的值了,如果是,那就不用修改,返回 true即可;如果不是,则需要执行 CAS,并将 CAS 执行是否成功的情况返回。
这个做法是满足 CAS 的定义的,如果期望的值是相等的,才会尝试修改新值。只不过这里多判断了一步,如果新值和现在不一样才修改,减少开销。
attemptStamp 表示只有 expectedReference 满足条件时,才尝试比较新 stamp 或者修改,否则不会修改。
/**
* Atomically sets the value of both the reference and stamp
* to the given update values if the
* current reference is {@code ==} to the expected reference
* and the current stamp is equal to the expected stamp.
*
* <p><a href="package-summary.html#weakCompareAndSet">May fail
* spuriously and does not provide ordering guarantees</a>, so is
* only rarely an appropriate alternative to {@code compareAndSet}.
*
* @param expectedReference the expected value of the reference
* @param newReference the new value for the reference
* @param expectedStamp the expected value of the stamp
* @param newStamp the new value for the stamp
* @return {@code true} if successful
*/
public boolean weakCompareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
return compareAndSet(expectedReference, newReference,
expectedStamp, newStamp);
}
/**
* Atomically sets the value of both the reference and stamp
* to the given update values if the
* current reference is {@code ==} to the expected reference
* and the current stamp is equal to the expected stamp.
*
* @param expectedReference the expected value of the reference
* @param newReference the new value for the reference
* @param expectedStamp the expected value of the stamp
* @param newStamp the new value for the stamp
* @return {@code true} if successful
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
/**
* Unconditionally sets the value of both the reference and stamp.
*
* @param newReference the new value for the reference
* @param newStamp the new value for the stamp
*/
public void set(V newReference, int newStamp) {
Pair<V> current = pair;
// 无条件修改,只要reference和stamp有一个不同就修改
if (newReference != current.reference || newStamp != current.stamp)
this.pair = Pair.of(newReference, newStamp);
}
/**
* Atomically sets the value of the stamp to the given update value
* if the current reference is {@code ==} to the expected
* reference. Any given invocation of this operation may fail
* (return {@code false}) spuriously, but repeated invocation
* when the current value holds the expected value and no other
* thread is also attempting to set the value will eventually
* succeed.
*
* @param expectedReference the expected value of the reference
* @param newStamp the new value for the stamp
* @return {@code true} if successful
*/
public boolean attemptStamp(V expectedReference, int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
(newStamp == current.stamp ||
casPair(current, Pair.of(expectedReference, newStamp)));
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long pairOffset =
objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class);
private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
String field, Class<?> klazz) {
try {
return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
} catch (NoSuchFieldException e) {
// Convert Exception to corresponding Error
NoSuchFieldError error = new NoSuchFieldError(field);
error.initCause(e);
throw error;
}
}
4 代码示例
两个线程 t1 和 t2。让 t1 先获取 stamp 后,让 t2 执行两次修改,t1 再执行一次修改,观察是否成功。
具体实现是通过 volatile boolean b 来控制前半段,在 t1 获取 stamp 前,t2 先空循环;通过在 t1 中使用 t2.join 等待 join 完成来控制后半段,将程序修改为串行执行。
最后的结果是 t2 修改失败,因为版本号已经改变了。
import java.util.concurrent.atomic.AtomicStampedReference;
public class A {
private static AtomicStampedReference<Integer> atomicStampedRef =
new AtomicStampedReference<>(1, 0);
private static volatile boolean b = false;
public static void main(String[] args){
Thread t2 = new Thread(() -> {
// 等待t1修改b后执行
// b修改前先空循环
while(!b) {}
atomicStampedRef.compareAndSet(1, 2, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
System.out.println("操作线程" + Thread.currentThread() + ",【increment】 ,值 = " + atomicStampedRef.getReference());
atomicStampedRef.compareAndSet(2, 1, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
System.out.println("操作线程" + Thread.currentThread() + ",【decrement】 ,值 = " + atomicStampedRef.getReference());
},"t2");
Thread t1 = new Thread(() -> {
System.out.println("操作线程" + Thread.currentThread() +",初始值 a = " + atomicStampedRef.getReference());
int stamp = atomicStampedRef.getStamp(); //获取当前标识
b = true;
try {
// 等待 t2 结束
t2.join();
}catch(Exception e){
e.printStackTrace();
}
boolean isCASSuccess = atomicStampedRef.compareAndSet(1,2,stamp,stamp +1); //此时expectedReference未发生改变,但是stamp已经被修改了,所以CAS失败
System.out.println("操作线程" + Thread.currentThread() +",CAS操作结果: " + isCASSuccess);
},"t1");
t1.start();
t2.start();
}
}
下面是我的公众号,Java与大数据进阶,分享 Java 与大数据笔面试干货,欢迎关注