JDK源码阅读—AtomicInteger

简介

AtomicInteger是JUC中提供了原子更新操作的一个Integer类,Java中i++(–)、++(–)i、i+=x、i-=x等都不是原子操作,多线程环境下需要加锁来保证数据的正确性,而AtomicInteger可以在不加锁的前提下确保上述操作的原子性,在高并发的场景下可以比加锁有更好的性能。

AtomicInteger、AtomicBoolean、AtomicLong、AtomicReference四种类实现大致相同,只是数据类型不同,所以只对AtomicInteger进行分析。

以下分析基于corretto-1.8.0_282版本。

属性

unsafe

/**
 * Unsafe实例,AtomicInteger依赖Unsafe提供的CAS能力
 */
private static final Unsafe unsafe = Unsafe.getUnsafe();

valueOffset

/**
 * value属性偏移量
 */
private static final long valueOffset;

value

/**
 * 储存实际值
 */
private volatile int value;

构造方法

AtomicInteger(int initialValue)

/**
 * 使用给定初始值实例化
 */
public AtomicInteger(int initialValue) {
    value = initialValue;
}

AtomicInteger()

/**
 * 使用默认初始值(0)实例化
 */
public AtomicInteger() {
}

静态初始化块

/**
 * 设置value字段的偏移
 */
static {
    try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
    } catch (Exception ex) { throw new Error(ex); }
}

/**
 * 获取给定字段在对象中的偏移
 * 位于sun.misc.Unsafe
 */
public native long objectFieldOffset(Field f);

方法

compareAndSet(int expect, int update)

/**
 * 当旧值与期望值相同时,将其原子更新为新值
 * CAS
 */
public final boolean compareAndSet(int expect, int update) {
    // 实现位于sun.misc.Unsafe#compareAndSwapInt
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

compareAndSwapInt方法位于sun.misc.Unsafe文件中

/**
 * 如果一个Java变量当前与expected相等,将其原子更新为x
 * 成功返回true
 * 对应的C++实现位于hotspot/src/share/vm/prims/unsafe.cpp#1213处的
 * Unsafe_CompareAndSwapInt方法
 */
public final native boolean compareAndSwapInt(Object o, long offset,
                                              int expected,
                                              int x);

此方法是一个native方法,由C++实现,位于hotspot/src/share/vm/prims/unsafe.cpp文件中。

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);

  // 前面都是一些准备工作,重要的是这一句,实际的CAS操作
  // 调用位于hotspot/src/share/vm/runtime/atomic.cpp#70处的unsigned Atomic::cmpxchg方法
  // 此方法会返回变量旧的值,若旧的值与期望值相等,则返回成功
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

继续调用Atomic类的cmpxchg方法,这个方法定义在hotspot/src/share/vm/runtime/atomic.cpp文件中

unsigned Atomic::cmpxchg(unsigned int exchange_value,
                         volatile unsigned int* dest, unsigned int compare_value) {
  assert(sizeof(unsigned int) == sizeof(jint), "more work to do");

  // 此处会根据操作系统类型和CPU架构选择不同的实现
  // 定义位于hotspot/src/share/vm/runtime/atomic.inline.hpp,此文件根据不同的宏定义,导入不同的实现
  // 例如:Windows系统x86架构会使用hotspot/src/os_cpu/windows_x86/vm/atomic_windows_x86.inline.hpp
  // Linux系统x86架构会使用hotspot/src/os_cpu/linux_x86/vm/atomic_linux_x86.inline.hpp
  return (unsigned int)Atomic::cmpxchg((jint)exchange_value, (volatile jint*)dest,
                                       (jint)compare_value);
}

atomic.cpp引入了atomic.inline.hpp,在atomic.inline.hpp中,会根据操作系统和CPU架构选择cmpxchg方法不同的实现。

#ifndef SHARE_VM_RUNTIME_ATOMIC_INLINE_HPP
#define SHARE_VM_RUNTIME_ATOMIC_INLINE_HPP

#include "runtime/atomic.hpp"

// Linux
#ifdef TARGET_OS_ARCH_linux_x86
# include "atomic_linux_x86.inline.hpp"
#endif
#ifdef TARGET_OS_ARCH_linux_sparc
# include "atomic_linux_sparc.inline.hpp"
#endif
#ifdef TARGET_OS_ARCH_linux_zero
# include "atomic_linux_zero.inline.hpp"
#endif
#ifdef TARGET_OS_ARCH_linux_arm
# include "atomic_linux_arm.inline.hpp"
#endif
#ifdef TARGET_OS_ARCH_linux_ppc
# include "atomic_linux_ppc.inline.hpp"
#endif

// Solaris
#ifdef TARGET_OS_ARCH_solaris_x86
# include "atomic_solaris_x86.inline.hpp"
#endif
#ifdef TARGET_OS_ARCH_solaris_sparc
# include "atomic_solaris_sparc.inline.hpp"
#endif

// Windows
#ifdef TARGET_OS_ARCH_windows_x86
# include "atomic_windows_x86.inline.hpp"
#endif

// AIX
#ifdef TARGET_OS_ARCH_aix_ppc
# include "atomic_aix_ppc.inline.hpp"
#endif

// BSD
#ifdef TARGET_OS_ARCH_bsd_x86
# include "atomic_bsd_x86.inline.hpp"
#endif
#ifdef TARGET_OS_ARCH_bsd_zero
# include "atomic_bsd_zero.inline.hpp"
#endif

#endif // SHARE_VM_RUNTIME_ATOMIC_INLINE_HPP

例如,Windows系统X86架构会选择hotspot/src/os_cpu/windows_x86/vm/atomic_windows_x86.inline.hpp。

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {

  // 检测CPU是否多核,多核会返回1,单核返回0
  int mp = os::is_MP();

  // 内联汇编,通过cmpxchg指令执行CAS操作
  // 如果是多核处理器,会在cmpxchg前加上lock前缀
  // cmpxchg dword ptr [edx], ecx 含义为若eax的值于edx指向的内存处的值相等,则使用ecx的值替换edx指向的内存处的值
  __asm {
    mov edx, dest
    mov ecx, exchange_value
    mov eax, compare_value
    LOCK_IF_MP(mp)
    cmpxchg dword ptr [edx], ecx
  }
}

cmpxchg会采用内联汇编调用cmpxchg指令对内存进行CAS更新,所以x86 Windows系统中的CAS的原子性是由CPU硬件保证的。

x86架构的Linux也是通过内联汇编调用cmpxchgl指令实现的CAS。

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {

  // 检测CPU是否多核,多核会返回1,单核返回0
  int mp = os::is_MP();

  // 内联汇编,通过cmpxchgl指令执行CAS操作
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}

自加操作

/**
 * 原子递增,返回旧值
 * i++的原子操作版本
 */
public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}

/**
 * 原子递减,返回旧值
 * i--的原子操作版本
 */
public final int getAndDecrement() {
    return unsafe.getAndAddInt(this, valueOffset, -1);
}

/**
 * 将value加上delta,返回相加之前的value
 */
public final int getAndAdd(int delta) {
    return unsafe.getAndAddInt(this, valueOffset, delta);
}

/**
 * 原子递增,返回新值
 * ++i的原子操作版本
 */
public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

/**
 * 原子递减,返回旧值
 * --i的原子操作版本
 */
public final int decrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}

/**
 * 将value加上delta,返回相加之后的value
 */
public final int addAndGet(int delta) {
    return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}

上述方法都会调用Unsafe中的getAndAddInt方法。

/**
 * 原子地对指定偏移处的变量加上一个整数delta
 */
public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        // 先获取旧值v
        v = getIntVolatile(o, offset);
        // 若值没有改变,则更新为v + delta
        // 否则获取新的值,再进行比较
        // 典型的CAS操作,不断重试,直到更新成功
    } while (!compareAndSwapInt(o, offset, v, v + delta));
    // 返回相加之前的值
    return v;
}

一元计算更新

/**
 * 以当前值为参数调用updateFunction,将值更新为计算的结果
 * 返回更新前的值
 */
public final int getAndUpdate(IntUnaryOperator updateFunction) {
    int prev, next;
    do {
        // 获取当前值
        prev = get();
        // 计算
        next = updateFunction.applyAsInt(prev);
        // CAS更新为计算的结果
    } while (!compareAndSet(prev, next));
    // 返回更新前的值
    return prev;
}

/**
 * 以当前值为参数调用updateFunction,将值更新为计算的结果
 * 返回更新后的值
 */
public final int updateAndGet(IntUnaryOperator updateFunction) {
    int prev, next;
    do {
        // 获取当前值
        prev = get();
        // 计算
        next = updateFunction.applyAsInt(prev);
        // CAS更新为计算的结果
    } while (!compareAndSet(prev, next));
    // 返回更新前的值
    return next;
}

二元计算更新

/**
 * 以当前值和传入x为参数调用accumulatorFunction,将值更新为计算的结果
 * 返回计算前的值
 */
public final int getAndAccumulate(int x,
                                  IntBinaryOperator accumulatorFunction) {
    int prev, next;
    do {
        // 获取旧值
        prev = get();
        // 计算
        next = accumulatorFunction.applyAsInt(prev, x);
        // CAS更新为计算后的值
    } while (!compareAndSet(prev, next));
    // 返回更新前的值
    return prev;
}

/**
 * 以当前值和传入x为参数调用accumulatorFunction,将值更新为计算的结果
 * 返回计算后的值
 */
public final int accumulateAndGet(int x,
                                  IntBinaryOperator accumulatorFunction) {
    int prev, next;
    do {
        // 获取旧值
        prev = get();
        // 计算
        next = accumulatorFunction.applyAsInt(prev, x);
        // CAS更新为计算后的值
    } while (!compareAndSet(prev, next));
    // 返回更新后的值
    return next;
}

总结

  1. AtomicInteger可以在不加锁的前提下实现值的原子更新。
  2. x86架构下原子性是由CPU硬件保证的。
上一篇:4.链表和递归


下一篇:数据机构之排序算法 ————快排