ConcurrentHashMap JDK 1.8 源码分析(自用)
如果有不对的地方还请大家指点,一起学习一起进步
线程安全的 HashMap
HashMap 是根据散列表来设计的,有着很快的存取速度,但是它存在着线程安全的问题。所以出现个一个新的线程安全的散列表集合:ConcurrentHashMap
ConcurrentHashMap 的底层数据结构为数据+链表+红黑树,并发控制使用 Synchronized 和 CAS 来操作
1、字段属性介绍
关键常量解释:
/**
* The largest possible table capacity. This value must be
* exactly 1<<30 to stay within Java array allocation and indexing
* bounds for power of two table sizes, and is further required
* because the top two bits of 32bit hash fields are used for
* control purposes.
* 最大容量
*/
private static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* The default initial table capacity. Must be a power of 2
* (i.e., at least 1) and at most MAXIMUM_CAPACITY.
* 默认容量
*/
private static final int DEFAULT_CAPACITY = 16;
/**
* The largest possible (non-power of two) array size.
* Needed by toArray and related methods.
* toArray 方法生成数组的最大长度
*/
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/**
* The default concurrency level for this table. Unused but
* defined for compatibility with previous versions of this class.
* 1.7 遗留下来的(表示并发级别),1.8 只在初始化时有用到(并不代表并发级别)
*/
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* The load factor for this table. Overrides of this value in
* constructors affect only the initial table capacity. The
* actual floating point value isn't normally used -- it is
* simpler to use expressions such as {@code n - (n >>> 2)} for
* the associated resizing threshold.
* 负载因子 (扩容阈值 = 当前容量 * 负载因子)
*/
private static final float LOAD_FACTOR = 0.75f;
/**
* The bin count threshold for using a tree rather than list for a
* bin. Bins are converted to trees when adding an element to a
* bin with at least this many nodes. The value must be greater
* than 2, and should be at least 8 to mesh with assumptions in
* tree removal about conversion back to plain bins upon
* shrinkage.
* 链表树化阈值
*/
static final int TREEIFY_THRESHOLD = 8;
/**
* The bin count threshold for untreeifying a (split) bin during a
* resize operation. Should be less than TREEIFY_THRESHOLD, and at
* most 6 to mesh with shrinkage detection under removal.
* 红黑树退化链表阈值
*/
static final int UNTREEIFY_THRESHOLD = 6;
/**
* The smallest table capacity for which bins may be treeified.
* (Otherwise the table is resized if too many nodes in a bin.)
* The value should be at least 4 * TREEIFY_THRESHOLD to avoid
* conflicts between resizing and treeification thresholds.
* 链表树化的最小容量 (集合容量)
*/
static final int MIN_TREEIFY_CAPACITY = 64;
/**
* Minimum number of rebinnings per transfer step. Ranges are
* subdivided to allow multiple resizer threads. This value
* serves as a lower bound to avoid resizers encountering
* excessive memory contention. The value should be at least
* DEFAULT_CAPACITY.
* 扩容时一个线程被分配的最小任务步长 (分配最少完成 16 个桶位 (连续) 的数据迁移)
*/
private static final int MIN_TRANSFER_STRIDE = 16;
/**
* The number of bits used for generation stamp in sizeCtl.
* Must be at least 6 for 32bit arrays.
* 用于生成扩容的唯一标识戳 (用于识别线程是否为当前扩容工作, 同一次扩容的线程的标识戳都相等)
*/
private static int RESIZE_STAMP_BITS = 16;
/**
* The maximum number of threads that can help resize.
* Must fit in 32 - RESIZE_STAMP_BITS bits.
* 并发扩容最大线程数
*/
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
/**
* The bit shift for recording size stamp in sizeCtl.
* 表示戳左移 RESIZE_STAMP_SHIFT 位 + (1 + 线程数) = sizeCtl
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
/*
* Encodings for Node hash fields. See above for explanation.
*/
// -1 表示该节点为 FWD (集合正在扩容, 该桶位的数据已迁移到新数组) 节点
static final int MOVED = -1; // hash for forwarding nodes(FWD 节点)
// -2 表示树化节点
static final int TREEBIN = -2; // hash for roots of trees(树化节点)
static final int RESERVED = -3; // hash for transient reservations
// 节点 hash 值的有效位数
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
/** Number of CPUS, to place bounds on some sizings */
// cpu 数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
/** For serialization compatibility. */
// 为兼容 1.7 而保留的
private static final ObjectStreamField[] serialPersistentFields = {
new ObjectStreamField("segments", Segment[].class),
new ObjectStreamField("segmentMask", Integer.TYPE),
new ObjectStreamField("segmentShift", Integer.TYPE)
};
// Unsafe mechanics
private static final sun.misc.Unsafe U;
/** 表示 sizeCtl 属性在 ConcurrentHashMap 在内存中的偏移地址 */
private static final long SIZECTL;
/** 表示 transferIndex 属性在 ConcurrentHashMap 在内存中的偏移地址 */
private static final long TRANSFERINDEX;
/** 表示 transferIndex 属性在 ConcurrentHashMap 在内存中的偏移地址 */
private static final long BASECOUNT;
/** 表示 cellBusy 属性在 ConcurrentHashMap 在内存中的偏移地址 */
private static final long CELLSBUSY;
/** 表示 cellValue 属性在 ConcurrentHashMap 在内存中的偏移地址 */
private static final long CELLVALUE;
/** 表示数组第一个元素的偏移地址 */
private static final long ABASE;
private static final int ASHIFT;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class<?> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class<?> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak);
// 表示数组单元所占用空间大小,scale 表示 Node[] 数组中每一个单元所占用空间大小
int scale = U.arrayIndexScale(ak);
// 判断 scale 是否为 2 的幂次方
// 例: 10 & 01 = 0
// 100 & 011 = 0
// 1000 & 0111 = 0
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
// numberOfLeadingZeros 方法返回当前数值转换为二进制后,从高位到低位开始统计,看有多少个 0 连续在一起
// ASHIFT 算出来结果为 scale 右边 0 的个数
// 例: 4 -> 100 scale 为 2
// Node[] 中某一个元素的位置偏移量为 ABASE + n * scale
// n * scale 可以替换为 n << ASHIFT
// Node[] 中某一个元素的位置偏移量就可以表示为 ABASE + n << ASHIFT
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
私有变量解释:
/**
* The array of bins. Lazily initialized upon first insertion.
* Size is always a power of two. Accessed directly by iterators.
* 散列表数组
*/
transient volatile Node<K,V>[] table;
/**
* The next table to use; non-null only while resizing.
* 扩容临时表
*/
private transient volatile Node<K,V>[] nextTable;
/**
* Base counter value, used mainly when there is no contention,
* but also as a fallback during table initialization
* races. Updated via CAS.
* 集合元素数量, 通过 CAS 的方式更新数量, 发生并发修改 baseCount 的时候,
* 创建 counterCells 数组, 用 CounterCell 来统计数据, 集合元素数量为所有 CounterCell
* 中统计的数量之和 + baseCount
*/
private transient volatile long baseCount;
/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
* -1 时, 表示当前的 table 正在初始化
* < -1 时, 表示正在初始化,高 16 位扩容的标示戳,低 16 位表示扩容的线程数
* 0 时, 表示创建
* > 0 时, 1.如果 table 未初始化,表示初始话大小
* 2.如果 table 已初始化,表示下次扩容时的阈值
*/
private transient volatile int sizeCtl;
/**
* The next table index (plus one) to split while resizing.
* 迁移的当前下标
*/
private transient volatile int transferIndex;
/**
* Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
* 0 无锁,1 加锁
*/
private transient volatile int cellsBusy;
/**
* Table of counter cells. When non-null, size is a power of 2.
* 发生并发修改 baseCount 的时候, 创建 counterCells 数组, 用 CounterCell 来统计数据,
* 集合元素数量为所有 CounterCell 中统计的数量之和 + baseCount
*/
private transient volatile CounterCell[] counterCells;
2、 put 方法
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
// onlyIfAbsent true 直接替换
// false 如果已存在 value 则不替换
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 扰动减少哈希冲突, 生成 hash
int hash = spread(key.hashCode());
// 节点标识
// 等于 2 表示为树节点或链表中第 2 个节点
// 大于零表示链表中的第 n 个节点
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
// f 头节点
// n 散列表数组长度
// i 寻址后的数组下标
// fh 头节点哈希
Node<K,V> f; int n, i, fh;
// 当 table 未初始化的时候, 初始化集合
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 当前桶位没有数据时, 直接尝试 put 数据
// tabAt() 获取当前桶位数据
// (n - 1) & hash 寻址算法
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果发生并发竞争设置失败, 则继续自旋
// casTabAt() 修改当前桶位数据
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果当前节点为 FWD 节点, 则参与帮助集合进行扩容
else if ((fh = f.hash) == MOVED)
// 集合扩容方法
tab = helpTransfer(tab, f);
// 剩下的为发生哈希冲突的情况 (桶位已有数据)
else {
// 旧值临时值
V oldVal = null;
// 锁住该桶位的数据
synchronized (f) {
// 防止头节点的值被其他线程修改
// tabAt(tab, i) 为获取当前桶位的头节点
if (tabAt(tab, i) == f) {
// 头节点哈希大于等于 0 表示为链表节点
// -1 表示该节点为 FWD (集合正在扩容, 该桶位的数据已迁移到新数组) 节点
// MOVED = -1;
// -2 表示树化节点
// TREEBIN = -2;
// RESERVED = -3;
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
// 当前节点的 key
K ek;
// 当前节点的 key 等于要插入的 key
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
// 保存当前节点的值
oldVal = e.val;
if (!onlyIfAbsent)
// 覆盖当前节点的值
e.val = value;
break;
}
// 保存当前节点临时值
Node<K,V> pred = e;
// 如果下一节点为空, 表示已经到队尾
if ((e = e.next) == null) {
// 追加节点, 尾插
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果当前节点为红黑树节点
else if (f instanceof TreeBin) {
// 与要插入的 key 哈希相等的节点
Node<K,V> p;
binCount = 2;
// 调用 TreeBin 的方法设置 value
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
// 保存当前节点的值
oldVal = p.val;
if (!onlyIfAbsent)
// 覆盖当前节点的值
p.val = value;
}
}
}
}
// 不等于 0 表示插入成功, 等于 0 表示还没插入, 继续自旋
if (binCount != 0) {
// 满足树化条件, 将链表树化
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// 表示发生哈希冲突, 进行的是替换操作, 直接返回, oldVal, 不同将集合元素数加一
if (oldVal != null)
return oldVal;
break;
}
}
}
// 加减集合元素数的方法, 集合元素数加一
addCount(1L, binCount);
// 没有哈希冲突, 返回 null
return null;
}
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
// ASHIFT 表示为 Node[] 数组一个元素的偏移量(必为 2 的 n 次方)的 2 的几次方数
// Node[] 中某第 n 个元素的位置偏移量为 ABASE + n << ASHIFT
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
// ASHIFT 表示为 Node[] 数组一个元素的偏移量(必为 2 的 n 次方)的 2 的几次方数
// Node[] 中某第 n 个元素的位置偏移量为 ABASE + n << ASHIFT
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
// ASHIFT 表示为 Node[] 数组一个元素的偏移量(必为 2 的 n 次方)的 2 的几次方数
// Node[] 中某第 n 个元素的位置偏移量为 ABASE + n << ASHIFT
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
// nextTab 扩容临时表
// sc sizeCtl
Node<K,V>[] nextTab; int sc;
// 如果 tab 不为空, 当前节点为 FWD 节点, nextTable 也不为空
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 计算获得当前扩容的标识戳, 计算出来的数 < 0
int rs = resizeStamp(tab.length);
// 满足条件表示扩容仍然在进行中
// sizeCtl < 0 表示正在扩容
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// (sc >>> RESIZE_STAMP_SHIFT) != rs, 表示生成的标识戳与当前扩容标识戳不符, sc 表示 sizeCtl, sizeCtl 小于 0 时, 高 16 位表示标识戳, 低 16 位表示 1 + 参与扩容的线程数
// transferIndex <= 0 表示扩容已完成, transferIndex 从数组的最后开始向前标记做数据迁移
// sc == rs + 1 源码中写错了, bug jira 中已经提出来应为 sc == (rs << 16) + 1, 表示扩容完毕
// sc == rs + MAX_RESIZERS, 也写错了, 应为 sc == (rs << 16) + MAX_RESIZERS, 表示已经达到最大扩容线程数量
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 尝试将扩容线程数 + 1, 进入扩容方法
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
// 返回扩容后的表
return nextTab;
}
return table;
}
/**
* Returns the stamp bits for resizing a table of size n.
* Must be negative when shifted left by RESIZE_STAMP_SHIFT.
*/
static final int resizeStamp(int n) {
// 保证生成的标识戳第一位为 1, 即为负数
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
3、 addCount 方法
持续更新
参考文献:
① https://www.jianshu.com/p/865c813f2726