ConcurrentHashMap
看前
本博客只是为了记录,学习流程,学习源码其大部分看的是ConcurrentHashMap 1.8 源码分析这位大佬的博客,写的十分详细,图文并茂非常易懂,十分推荐。我这篇博客只是在他的基础上加了点自己的理解(肯定有不少理解不到位的地方)以及查询了一些没讲到的函数。
介绍
源码: java.util.concurrent.ConcurrentHashMap
HashMap有线程安全问题,导致出现链表死循环的情况。HashTable和ConcurrentHashMap没有线程安全问题,但二者上锁的方式不同,一个是HashTable全局一把锁,使用的是synchronized,ConcurrentHashMap是分段锁只锁住一个SegMent对象,其Segment对象继承于ReentrantLock。
JDK7: segment+Hashentry+reentrantlock
JDK8: synchronized+CAS
底层数据结构JDK7是segment数组+hashentry链表,JDK8是Node,treebin(红黑树),导致了二者的遍历时间复杂度有时不同。
源码分析
有了前面的HashMap基础,我们在讲解ConcurrentHashMap时只讲解二者不同之处。
主要属性
private static final int MAXIMUM_CAPACITY = 1 << 30;
private static final int DEFAULT_CAPACITY = 16;
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
static final int MOVED = -1; // 表示正在转移
static final int TREEBIN = -2; // 表示已经转换成树
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
transient volatile Node<K,V>[] table;//默认没初始化的数组,用来保存元素
private transient volatile Node<K,V>[] nextTable;//转移的时候用的数组
private transient volatile int sizeCtl;
我们可以发现相对于HashMap新加了一个sizeCtl,是一个非常重要的变量。它的意义可以解释为。
- sizeCtl :
- 默认为0,用来控制table的初始化和扩容操作
- -1 代表table正在初始化
- -N 表示有N-1个线程正在进行扩容操作
- 其余情况:
- 如果table未初始化,表示table需要初始化的大小。
- 如果table初始化完成,表示table的扩容阈值,默认是table大小的0.75倍
希望在看源码前对基本的构成心中有大概的轮廓,看代码才会更加轻松
内部类
Node
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
相较于HashMap的Node实现,ConcurrentHashMap的Node需要在子类中实现setValue,并且实现了该节点往后节点值得寻找方法find。
TreeNode
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}
/**
* Returns the TreeNode (or null if not found) for the given key
* starting at given root.
*/
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
if (k != null) {
TreeNode<K,V> p = this;
do {
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if (pl == null)
p = pr;
else if (pr == null)
p = pl;
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
p = (dir < 0) ? pl : pr;
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
}
return null;
}
}
与HashMap不同,HashMap的TreeNode继承于LinkedHashMap.Entry<K,V>,并且使用了上百行代码对其进行了完整实现。但在ConcurrentHashMap中的TreeNode只是继承于Node,并且重写了find方法。对于树的完整实现放到了哪里呢?在ConcurrentHashMap中放到了TreeBin中。
TreeBin
这个类并不负责包装用户的key、value信息,而是包装的很多TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象,这是与HashMap的区别。另外这个类还带有了读写锁。由于代码过多我们只贴出来他的构造函数。
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}
我们可以看到和HashMap一样熟悉的建立红黑树的操作。
ForwardingNode
在执行transfer操作期间,作为桶位的头结点。
三个原子操作
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
- tabAt():获得在i位置上的Node节点
- casTabAt():利用CAS算法设置i位置上的Node节点。
- setTabAt():设置节点位置的值。
而casTabAt()与setTabAt()方法的区别:
所以真正要进行有原子语义的写操作需要使用casTabAt()方法,setTabAt()是在锁定桶的状态下需要使用的方法,如此方法实现只是带保守性的一种写法而已。
讲解的主要方法
//无参构造
public ConcurrentHashMap(){}
//设定容量
public ConcurrentHashMap(int initialCapacity){}
//将map转化为CHM
public ConcurrentHashMap(Map<? extends K, ? extends V> m){}
//对table进行初始化
private final Node<K,V>[] initTable()
//增
public V put(K key, V value)
//在同一个节点的个数超过8个的时候,会调用treeifyBin方法来看看是扩容还是转化为一棵树
private final void treeifyBin(Node<K,V>[] tab, int index)
数组扩容的主要方法就是transfer方法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab)
//查
public V get(Object key)
//删除
public V remove(Object key)
另外在讲解主要方法时会顺便讲解其调用的方法。
初始化
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//table初始化的操作只能有一个线程执行,sizectl<0说明有另外一个线程正在初始化。那么本线程的初始化资格就让出去,不再执行后序的初始化操作。
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//CAS将cs改为-1,表示本线程正在初始化。
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);//sizeCtl更新为容量的0.75
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
对于ConcurrentHashMap来说,调用它的构造方法仅仅是设置了一些参数而已。而整个table的初始化是在向ConcurrentHashMap中插入元素的时候发生的。如调用put、computeIfAbsent、compute、merge等方法的时候,调用时机是检查table==null。
初始化方法主要应用了关键属性sizeCtl 如果这个值<0,表示其他线程正在进行初始化,就放弃这个操作。在这也可以看出ConcurrentHashMap的初始化只能由一个线程完成。如果获得了初始化权限,就用CAS方法将sizeCtl置为-1,防止其他线程进入。初始化数组后,将sizeCtl的值改为0.75*n。
构造函数
public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
有了HashMap基础前两个好理解,不过第二个设定initialCapacity时,在最后将sizeCtl设置为cap。第三个将sizeCtl设置为初始容量。不用担心sizeCtl会在后序中进行改变。我们来看一下他调用的putAll方法。
public void putAll(Map<? extends K, ? extends V> m) {
tryPresize(m.size());
for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
putVal(e.getKey(), e.getValue(), false);
}
其中tryPresize
private final void tryPresize(int size) {
//由于有容量2的次幂限制,将容量设为大于等于size的2的次幂值。与HashMap一样该功能由tableSizeFor实现。
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
//若table还未初始化,则进行初始化。
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
//此线程扩容期间,不允许其他线程进行扩容。
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
//c没有达到扩容阈值或本身长度已经到达最大值,则不扩容,直接退出。
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
//初始化了,准备将值放到新的table中。
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
由于水平原因,笔者在这里遇到了个问题。
问题:31-39行代码永远不会执行。
解释:在6行对sc进行了赋值,进入while循环的都是sc>=0的情况。从while到执行sc<0之前的期间没有出现对sc重新赋值的情况,那么运行到sc<0的if判断时,一定为false。那么作者写这段代码的含义是什么?
下面讲解一下resizeStamp(n)函数,他执行之后,rs的二进制位表示中低16位为1,0-15位存储n二进制表示的最高非0位前0的个数值。不明白的看下面的博客讲解。
ConcurrentHashMap的源码分析-resizeStamp
结合上面的提示可以理解程序的执行流程,这里重点提一下40行的(rs << RESIZE_STAMP_SHIFT) + 2),CAS将SizeCtl的值变为(rs << RESIZE_STAMP_SHIFT) + 2),他的值一定为负数,因为rs的16位为1,在左移16位,那符号位为1,通过原子操作将SizeCtl变为相应的负数,并且创建一个两倍长度的新table,并赋值到nextTable上。下面我们只需要搞清楚一点如何将数据迁移到newtable中。
这是该类最难的transfer函数
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//若nextTab==null则建立newtable
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//扩张1倍的空间。
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//老数据存在0-n-1,所以下一个索引为n
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
//玄幻处理,i表示当前处理的桶下标。bound表示分得区间的下界。
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//控制桶的序号
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//判断是否结束
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//当前桶位空。插入一个fwd
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//当前桶已经迁移
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
//迁移桶内数据
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//处理节点是链表
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//处理节点是红黑树
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
transfer可以拆分为这几个部分
-
根据CPU数划分table的步长:
- 若为单核CPU,步长为table.length
- 若为多核CPU,步长为max((n >>> 3) / NPC, MIN_TRANSFER_STRIDE)
-
若传入的nextTab为null,创建nextTable
- 将创建的nextTab赋给全局变量nextTable
- 将用于控制扩容区间的分配transferIndex初始化为table.length
-
开始循环处理区间内各个桶的迁移,由advance控制程序。
- while循环对区间[bound, i]进行分配或调整(–i)(27-43)
- 开始下面四个分支
- i越界:判断当前区间的迁移是否结束;(45-59)
- 当前桶位null,插入一个fwd
- 当前桶已经迁移,跳过
- 其他:迁移桶内数据
- 链表(72-101)
- 红黑树(103-137)
-
27-43:控制区间
while (advance) {
int nextIndex, nextBound;
// 选择下一个桶或处理结束信号
if (--i >= bound || finishing)
advance = false;
// 没有新的区间可以分配
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 分配区间
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
while循环求得本次这一次多线程处理的区间范围。第一次循环1,2判断语句都不成立,进入第三个,生成第一个区间[bound,i]其中bound为nextBound,nextBound在cas中被改为nextIndex-stride,所以第一次区间为[nextIndex-stride, nextIndex-1],更新。下一次循环进入第一个判断语句,得到当前处理的i后,退出while,最后一次循环进入第二条判断语句,从后往前遍历到头了。
- 45-59:结束条件
// i<0时,已经完成扩容,暂不知道(i>=n || i+n>=nextn)的含义
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// finishing为true则结束
if (finishing) {
nextTable = null; // 置空
table = nextTab; // 替换
sizeCtl = (n << 1) - (n >>> 1); // sizeCtl设为0.75倍的table.length
return;
}
// finishing为false,但是该线程的任务已经完成,sizeCtl中的线程数减1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 判断sizeCtl是否回到刚开始扩容的状态
// 是则说明所有用于迁移的线程都结束工作,否则直接返回
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// finishing置为true
finishing = advance = true;
// i置为table.length,全部扫描一遍,确定所有桶都已经完成迁移
i = n; // recheck before commit
}
}
当i<0时,当前线程已经完成了table最后一个区间,对finishing进行判断:
-
finishing为true:
- 全局变量nextTable为空
- 替换table
- 设置sizeCtl
-
finishing为false:
- 原子操作对sizeCtl中线程数-1
- 判断是否所有线程都结束工作,否则返回,是则继续
- finishing为true
- i为table.length,下一个循环开始全局检查。
-
72-101:迁移链表
// 上锁,与putVal()同步,f为当前桶的头节点
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// hash>0为链表,树节点的hash=-2
if (fh >= 0) {
// 决定移动到低位桶还是高位桶
int runBit = fh & n;
Node<K,V> lastRun = f;
// 找到最后将迁移到同一个桶的所有节点,
// 这部分不需要创建新的节点,而是直接迁移
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 根据每个节点的hash值迁移,由于节点中的key是不可变的,需要创建新的节点
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
// 头插法,创建新节点时把自己作为next节点传入
// 最后链表的顺序将会颠倒(除了lastRun之后的)
ln = new Node<K,V>(ph, pk, pv, ln);
else
// 同上
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln); // 原子操作把低位桶置入新表
setTabAt(nextTab, i + n, hn); // 原子操作把高位桶置入新表
setTabAt(tab, i, fwd); // 原子操作把fwd置入旧表表示已经迁移
advance = true;
}
else if (f instanceof TreeBin) {
...
}
}
}
与HashMap扩容一样,链表中要分为两支队伍,根据更高一位是0 or 1来分队伍。在这里用头插法,然后将两队挂到不同的对应索引下。第一个for循环用来找到并构建其中一个新链表的最后位置(因为第二个for中用到的是头插法构建新链表),也是后面for循环的终止条件。
- 103-137:迁移红黑树
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
...
}
// 节点为红黑树
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
// 遍历
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
// 和链表相同的判断,与运算==0的放在低位
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
} // 不是0的放在高位
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果树的节点数小于等于 6,那么转成链表,反之,创建一个新的树
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 低位树
setTabAt(nextTab, i, ln);
// 高位数
setTabAt(nextTab, i + n, hn);
// 旧的设置成占位符
setTabAt(tab, i, fwd);
// 继续向后推进
advance = true;
}
}
}
前面和HashMap中的split一致,逻辑也一致,只是使用了原子操作将两条链表或树挂到对应位置。
插入
主函数
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
注意:hash值进行了更新操作为 hashcode ^ (hashcode >>> 16)) & HASH_BITS
函数流程:
- table未被初始化,初始化table
- table已经初始化,但是对应桶为空,新建Node加入,跳出
- table已经初始化,且相应桶不为空,但桶正在迁移,当前线程帮助迁移。
- table已经初始化,且相应桶不为空,且该桶未在迁移,将键值对加入桶中。
- 桶中是链表,遍历这个链表:
- 若存在:根据onlyIfAbsent修改value,跳出
- 若不存在创建新的node挂到尾部,跳出
- 桶中是红黑树: 通过TreeNode中putTreeValue写入。
上面函数没有讲过的有treeifBin(),addCount()。二者都包含了扩容操作,但是扩容的结果是不同的。
将treeifyBin中使用tryPresize进行的扩容为P扩容。
将addCount进行的扩容称为C扩容。
- 桶中是链表,遍历这个链表:
treeifyBin
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY) // 扩容
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 转化红黑树
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
如果插入一个键值对后,链表长度大于等于 TREEIFY_THRESHOLD,就需要进行扩容或转化红黑树。treeifyBin() 是这一操作的入口,首先判断 table 容量是否小于 MIN_TREEIFY_CAPACITY,如果是则进行 P 扩容(由 tryPresize() 进行的,有别于下文中由 addCount() 进行的),否则进行红黑树转化。
addCount
private final void addCount(long x, int check) {
// 计数部分,暂不考虑
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//CAS失败进入fulladdCount(x,false),否则fullAddCount(x,true)
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
// 判断是否C扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 满足三个条件则C扩容:1.size达到sizeCtl;2.table非空;3.table长度非最大值
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 以下为C扩容,和tryPresize()中的一个分支完全一致
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
20行之前addCount通过CounterCell数组计算键值对个数,后面对check进行判断若其>=0,且
- 键值对数达到szieCtl
- table非空
- table长度并非最大值。
进行C扩容。下面的结构与tryPresize中的一个分支完全一致。
计数
主要讲解的就是addCount前20行实现。
通过 size() 可以获得当前键值对的数量,它将 sumCount() 获得的 long 类型的值转化为 int 返回。
sumCount() 则计算 baseCount 字段与 counterCells 数组中所有非空元素的记录值的和。
// 未发生争用前都用它计数
private transient volatile long baseCount;
// 用于同步counterCells数组结构修改的乐观锁资源
private transient volatile int cellsBusy;
// 支持多个线程同时通过counterCells中的多个元素计数
private transient volatile CounterCell[] counterCells;
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
// 计算 baseCount 字段与所有 counterCells 数组的非空元素的和
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
CHM将每个桶中的键值对数量保存在baseCount和CounterCell 数组 counterCells 中。在什么地方对baseCount 和 counterCells进行赋值的呢?在addCount前20行。
addCount()中有两层if,第一层:
- counterCells不为空:跳转到第二层if;
- counterCells为空:CAS操作增加baseCount,成功结束if,失败进入二层if
第二层if创建counterCells - counterCells==null || couterCells[线程hash]==null,调用fullAddCount(x,true)
- couterCells[线程hash]!=null,CAS更改cellvalue值
- 成功:结束
- 失败:调用fullAddCount(x, false)【线程冲突】
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
//线程Hash是否初始化
if ((h = ThreadLocalRandom.getProbe()) == 0) {
//初始化
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
//初始化后不会发生冲突
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
//counterCells是否可以用来计数,即判断他整体是否为空
if ((as = counterCells) != null && (n = as.length) > 0) {
//这个桶为空,初始化counterCells
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
//初始化,记录这个桶内的元素数量为要新建的元素个数。
CounterCell r = new CounterCell(x); // Optimistic create
//尝试拿锁
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
//拿到锁了,将新建的counterCell元素放到指定位置。
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
//初始化创建完成
created = true;
}
} finally {
//无论成功与否都释放锁。
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;//无需扩容
}
//存在竞争,重置wasUncontended后,下一句就是rehash,然后重试
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//上面获取锁失败,说明有其他线程得到了锁,那么尝试将值加到BaseCount上
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
//counterCells是否扩容or其长度是否>=处理器数
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
//没有扩容,数组长度<处理器数量,collide为true,计划给CounterCells数组扩容。
else if (!collide)
collide = true;
//尝试拿锁
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
//给counterCells扩容
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
//放锁
cellsBusy = 0;
}
//不需要扩容
collide = false;
continue; // Retry with expanded table
}
//更改当前线程的hash值
h = ThreadLocalRandom.advanceProbe(h);
}
//没有执行上面的if说明counterCells为null,初始化counterCells
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];//初始长度为2,最小的2次幂
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;//初始化成功
}
} finally {
//放锁
cellsBusy = 0;
}
if (init)
break;
}
//counterCells为空or长度为0,并且拿锁失败。
//尝试将其加到basecount上
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
如何理解流程呢?
他的主体就是for,循环中三个if
-
if ((as = counterCells) != null && (n = as.length) > 0)
counterCells非空情况 -
else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1))
counterCells为null -
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
counterCells为空且获取锁失败 - h = ThreadLocalRandom.advanceProbe(h);重新生成hash
三种情况,具体的每种情况的逻辑看具体的注释。
读取
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 首节点即匹配
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 该桶已经被迁移,则交由节点find()方法查找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 搜索桶内所有节点
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
通过查看注释十分好理解,在首点匹配,桶迁移情况(调用find寻找)匹配失败后,搜索所有节点。
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
参考
- ConcurrentHashMap源码分析(1.8)
- 源码级解读ConcurrentHashMap,妈妈再也不用担心我的学的学习
- ConcurrentHashMap底层原理剖析
- ConcurrentHashMap 1.8 源码分析