AtomicStampedReference源码解析(基于 JDK 1.8)

@

目录
在 atomic 包中,有三个 Reference 相关的类:AtomicReferenceAtomicStampedReferenceAtomicMarkableReference

AtomicReference 无法解决 ABA 问题,而后面的两个类特别是 AtomicStampedReference 能够很好地解决 ABA 问题。

ABA问题可以见我前面的文章 unsafe 介绍(二)与CAS

1 Stamped 和 Markable的比较

AtomicStampedReference 和 AtomicMarkableReference 都能处理 ABA 问题,区别如下:

AtomicStampedReference 内部的 Pair 类,除了包含数据的 reference,还有一个 int 型的 stamp。

AtomicMarkableReference 结构类似,除了 reference 还有一个 boolean 的 mark。

AtomicStampedReference 的版本最多和不同的 Integer 个数一样多,即 \(2^{32}\),而 AtomicMarkableReference 的版本只有两种,很容易重复。

以下只分析 AtomicStampedReference。

public class AtomicStampedReference<V> {

    private static class Pair<T> {
        final T reference;
       	// 使用 stamp 标记
        final int stamp;
        private Pair(T reference, int stamp) {
            this.reference = reference;
            this.stamp = stamp;
        }
        static <T> Pair<T> of(T reference, int stamp) {
            return new Pair<T>(reference, stamp);
        }
    }
		 
    private volatile Pair<V> pair;
    ...
}

public class AtomicMarkableReference<V> {

    private static class Pair<T> {
        final T reference;
        // 使用 mark 标记
        final boolean mark;
        private Pair(T reference, boolean mark) {
            this.reference = reference;
            this.mark = mark;
        }
        static <T> Pair<T> of(T reference, boolean mark) {
            return new Pair<T>(reference, mark);
        }
    }

    private volatile Pair<V> pair;
}

2 介绍

正如前面所说,AtomicStampedReference 内部持有一个 Pair,而 Pair 其实包含数据 reference 和版本 stamp 的。

注意到 reference 和版本 stamp 都是 final 的,保证不可变。要修改只能调用静态的 of 生成一个新的 Pair,这保证了安全。

下面的 get 方法只返回 reference,stamp 会放在 stampHolder[0] 中。

// 初始化
public AtomicStampedReference(V initialRef, int initialStamp) {
        pair = Pair.of(initialRef, initialStamp);
    }

    /**
     * Returns the current value of the reference.
     *
     * @return the current value of the reference
     */
    public V getReference() {
        return pair.reference;
    }

    /**
     * Returns the current value of the stamp.
     *
     * @return the current value of the stamp
     */
    public int getStamp() {
        return pair.stamp;
    }

    /**
     * Returns the current values of both the reference and the stamp.
     * Typical usage is {@code int[1] holder; ref = v.get(holder); }.
     *
     * @param stampHolder an array of size of at least one.  On return,
     * {@code stampholder[0]} will hold the value of the stamp.
     * @return the current value of the reference
     */
// 将 reference 返回, stamp 放入 stampHolder[0]
    public V get(int[] stampHolder) {
        Pair<V> pair = this.pair;
        stampHolder[0] = pair.stamp;
        return pair.reference;
    }

3 其他方法

下面的方法都调用 Unsafe,前文讲过,不再描述。

重点是 compareAndSet 和 attemptStamp,return 后面非常不好理解。

涉及的知识点是 && 的短路机制。如果有三个表达式 a,b,c,把它们组合成一个新的表达式 a && b && c,短路机制如下:

如果 a 为 false,结果为 false,不处理 b 和 c

如果 a 为 true,继续处理

 如果 b 为 false,结果为 false,不处理 c

 如果 b 为 true ,继续处理

  如果 c 为 false,返回 false

  如果 c 为 true ,返回 true

以 compareAndSet 为例,如果期望的 expectedReference 满足条件,才会判断期望的 expectedStamp;如果期望的 expectedStamp 满足条件,才执行第三个括号;如果没有执行到第三个括号就结束,说明期望值不管是 reference 还是 stamp 至少一个不满足条件,会返回 false。

第三个括号首先判断当前值是不是已经是新的值了,如果是,那就不用修改,返回 true即可;如果不是,则需要执行 CAS,并将 CAS 执行是否成功的情况返回。

这个做法是满足 CAS 的定义的,如果期望的值是相等的,才会尝试修改新值。只不过这里多判断了一步,如果新值和现在不一样才修改,减少开销。

attemptStamp 表示只有 expectedReference 满足条件时,才尝试比较新 stamp 或者修改,否则不会修改。

/**
     * Atomically sets the value of both the reference and stamp
     * to the given update values if the
     * current reference is {@code ==} to the expected reference
     * and the current stamp is equal to the expected stamp.
     *
     * <p><a href="package-summary.html#weakCompareAndSet">May fail
     * spuriously and does not provide ordering guarantees</a>, so is
     * only rarely an appropriate alternative to {@code compareAndSet}.
     *
     * @param expectedReference the expected value of the reference
     * @param newReference the new value for the reference
     * @param expectedStamp the expected value of the stamp
     * @param newStamp the new value for the stamp
     * @return {@code true} if successful
     */
    public boolean weakCompareAndSet(V   expectedReference,
                                     V   newReference,
                                     int expectedStamp,
                                     int newStamp) {
        return compareAndSet(expectedReference, newReference,
                             expectedStamp, newStamp);
    }

    /**
     * Atomically sets the value of both the reference and stamp
     * to the given update values if the
     * current reference is {@code ==} to the expected reference
     * and the current stamp is equal to the expected stamp.
     *
     * @param expectedReference the expected value of the reference
     * @param newReference the new value for the reference
     * @param expectedStamp the expected value of the stamp
     * @param newStamp the new value for the stamp
     * @return {@code true} if successful
     */
    public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }

    /**
     * Unconditionally sets the value of both the reference and stamp.
     *
     * @param newReference the new value for the reference
     * @param newStamp the new value for the stamp
     */
    public void set(V newReference, int newStamp) {
        Pair<V> current = pair;
       // 无条件修改,只要reference和stamp有一个不同就修改
        if (newReference != current.reference || newStamp != current.stamp)
            this.pair = Pair.of(newReference, newStamp);
    }

    /**
     * Atomically sets the value of the stamp to the given update value
     * if the current reference is {@code ==} to the expected
     * reference.  Any given invocation of this operation may fail
     * (return {@code false}) spuriously, but repeated invocation
     * when the current value holds the expected value and no other
     * thread is also attempting to set the value will eventually
     * succeed.
     *
     * @param expectedReference the expected value of the reference
     * @param newStamp the new value for the stamp
     * @return {@code true} if successful
     */
    public boolean attemptStamp(V expectedReference, int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            (newStamp == current.stamp ||
             casPair(current, Pair.of(expectedReference, newStamp)));
    }

    // Unsafe mechanics

    private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
    private static final long pairOffset =
        objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class);

    private boolean casPair(Pair<V> cmp, Pair<V> val) {
        return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
    }

    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
                                  String field, Class<?> klazz) {
        try {
            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
        } catch (NoSuchFieldException e) {
            // Convert Exception to corresponding Error
            NoSuchFieldError error = new NoSuchFieldError(field);
            error.initCause(e);
            throw error;
        }
    }

4 代码示例

两个线程 t1 和 t2。让 t1 先获取 stamp 后,让 t2 执行两次修改,t1 再执行一次修改,观察是否成功。

具体实现是通过 volatile boolean b 来控制前半段,在 t1 获取 stamp 前,t2 先空循环;通过在 t1 中使用 t2.join 等待 join 完成来控制后半段,将程序修改为串行执行。

最后的结果是 t2 修改失败,因为版本号已经改变了。

import java.util.concurrent.atomic.AtomicStampedReference;

public class A {

    private static AtomicStampedReference<Integer> atomicStampedRef =
            new AtomicStampedReference<>(1, 0);
    private static volatile boolean b = false;
    public static void main(String[] args){


        Thread t2 = new Thread(() -> {
            // 等待t1修改b后执行
            // b修改前先空循环
            while(!b) {}
            atomicStampedRef.compareAndSet(1, 2, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
            System.out.println("操作线程" + Thread.currentThread() + ",【increment】 ,值 = " + atomicStampedRef.getReference());
            atomicStampedRef.compareAndSet(2, 1, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
            System.out.println("操作线程" + Thread.currentThread() + ",【decrement】 ,值 = " + atomicStampedRef.getReference());

        },"t2");


        Thread t1 = new Thread(() -> {
            System.out.println("操作线程" + Thread.currentThread() +",初始值 a = " + atomicStampedRef.getReference());
            int stamp = atomicStampedRef.getStamp(); //获取当前标识
            b = true;
            try {
                // 等待 t2 结束
                t2.join();
            }catch(Exception e){
                e.printStackTrace();
            }

            boolean isCASSuccess = atomicStampedRef.compareAndSet(1,2,stamp,stamp +1);  //此时expectedReference未发生改变,但是stamp已经被修改了,所以CAS失败
            System.out.println("操作线程" + Thread.currentThread() +",CAS操作结果: " + isCASSuccess);
        },"t1");


        t1.start();
        t2.start();
    }
}

下面是我的公众号,Java与大数据进阶,分享 Java 与大数据笔面试干货,欢迎关注
AtomicStampedReference源码解析(基于 JDK 1.8)

上一篇:【Rust日报】2020-09-21 Rust宣布成立错误处理项目组


下一篇:多渠道打包工具Walle源码分析