concurrentHshMap为什么是线程安全的
1.concurrenthshmap跟hashmap比,为什么是线程安全的
它使用Segment和HashEntry两个数组组成,加锁同步加存储K,V键值。
每一个ConcurrentHashMap都包含了一个Segment数组,在Segment数组中每一个Segment对象则又包含了一个HashEntry数组,
而在HashEntry数组中,每一个HashEntry对象保存K-V数据的同时又形成了链表结构,此时与HashMap结构相同。
在多线程中,每一个Segment对象守护了一个HashEntry数组,当对ConcurrentHashMap中的元素修改时,在获取到对应的Segment数组角标后,都会对此Segment对象加锁,之后再去操作后面的HashEntry元素,这样每一个Segment对象下,都形成了一个小小的HashMap。
1.1.concurrentHashMap属性
//默认最大的容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认初始化的容量
private static final int DEFAULT_CAPACITY = 16;
//最大的数组可能长度
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//默认的并发级别,目前并没有用,只是为了保持兼容性
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//和hashMap一样,负载因子
private static final float LOAD_FACTOR = 0.75f;
//和HashMap一样,链表转换为红黑树的阈值,默认是8
static final int TREEIFY_THRESHOLD = 8;
//红黑树转换链表的阀值,默认是6
static final int UNTREEIFY_THRESHOLD = 6;
//进行链表转换最少需要的数组长度,如果没有达到这个数字,只能进行扩容
static final int MIN_TREEIFY_CAPACITY = 64;
//table扩容时, 每个线程最少迁移table的槽位个数
private static final int MIN_TRANSFER_STRIDE = 16;
//感觉是用来计算偏移量和线程数量的标记
private static int RESIZE_STAMP_BITS = 16;
//能够调整的最大线程数量
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//记录偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//值为-1, 当Node.hash为MOVED时, 代表着table正在扩容
static final int MOVED = -1;
//TREEBIN, 置为-2, 代表此元素后接红黑树
static final int TREEBIN = -2;
//感觉是占位符,目前没看出来明显的作用
static final int RESERVED = -3;
//主要用来计算Hash值的
static final int HASH_BITS = 0x7fffffff;
//节点数组
transient volatile Node[] table;
//table迁移过程临时变量, 在迁移过程中将元素全部迁移到nextTable上
private transient volatile Node[] nextTable;
//基础计数器
private transient volatile long baseCount;
//table扩容和初始化的标记,不同的值代表不同的含义,默认为0,表示未初始化
//-1: table正在初始化;小于-1,表示table正在扩容;大于0,表示初始化完成后下次扩容的大小
private transient volatile int sizeCtl;
//table容量从n扩到2n时, 是从索引n->1的元素开始迁移, transferIndex代表当前已经迁移的元素下标
private transient volatile int transferIndex;
//扩容时候,CAS锁标记
private transient volatile int cellsBusy;
//计数器表,大小是2次幂
private transient volatile CounterCell[] counterCells;
1.2.concurrentHshMap的put方法
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 计算hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // table是在首次插入元素的时候初始化,lazy
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, // 如果这个位置没有值,直接放进去,由CAS保证线程安全,不需要加锁
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 节点上锁,这里的节点可以理解为hash值相同组成的链表的头节点,
// 锁的粒度为头节点。
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
1.2.1.ConcurrentHashMap的put方法的主要流程
- 1.判断key和vaule是否为空,如果为空,直接抛出异常。
- 2.判断table数组是否已经初始化完毕,如果没有初始化,进行初始化。
- 3.计算key的hash值,如果该位置为空,直接构造节点放入。
- 4.如果table正在扩容,进入帮助扩容方法。
- 5.最后开启同步锁,进行插入操作,如果开启了覆盖选项,直接覆盖,否则,构造节点添加到尾部,如果节点数超过红黑树阈值,进行红黑树转换。如果当前节点是树节点,进行树插入操作。
- 6.最后统计size大小,并计算是否需要扩容。
(*这里的node数组就是上文中的segment数组)
2.为什么ConcurrentHashMap比HashTable高效
- ConcurrentHashMap里使用了Segment分段锁+HashEntry,
- 而HashTable用的是Syncronized锁全部,所有线程竞争一把锁。
Segment分段锁继承ReentrantLock,在并发数高的时候,ReentrantLock比Syncronized总体开销要小一些。