CurrentHashMap 为什么线程安全

1. HashTable与HashMap的区别

1.1. HashMap

HashMap是线程不安全的,在多线程环境下,使用Hashmap进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap。

参考:HashMap为什么不安全

1.2. HashTable

HashTable和HashMap的实现原理几乎一样,差别无非是

HashTable 不允许 key 和 value 为 null
HashTable 是线程安全的

但是HashTable线程安全的策略实现代价却太大了,简单粗暴,get/put所有相关操作都是 synchronized 的,这相当于给整个哈希表加了一把大锁。

多线程访问时候,只要有一个线程访问或操作该对象,那其他线程只能阻塞,相当于将所有的操作串行化,在竞争激烈的并发场景中性能就会非常差。

CurrentHashMap 为什么线程安全CurrentHashMap 为什么线程安全

2. JDK1.7 中 CurrentHashMap

2.1. CurrentHashMap 的结构

ConcurrentHashMap 采用了数组 + Segment + 分段锁的方式实现。

ConcurrentHashMap 中的分段锁称为 Segment,Segment 用来减少锁的粒度,Segment 类似于 HashTable 的结构,即内部拥有一个 Entry 数组,数组中的每个元素又是一个链表。

CurrentHashMap 为什么线程安全

简单的理解,就是 ConcurrentHashMap 中有多个 Segment,每一个 Segment 都会有一把锁,所以每个Segment 中只能有一个线程执行,可以把 Segment 当做 HashTable 来看待

默认情况下 Segment 的大小为 16,所以理想状态下,多线程中 ConcurrentHashMap 的速度是 HashTable 的 16 倍。

CurrentHashMap 为什么线程安全

2.3. 该结构的优劣势

坏处:这一种结构的带来的副作用是 hash 的过程要比普通的 HashMap 要长,因为要 hash 两次,第一次 hash 确定 Segment 数组的角标,第二次 hash 确定 Segment 中的 Entry 数组角标。

好处:写操作的时候可以只对元素所在的 Segment 进行加锁即可,不同 Segment 之间不存在锁的竞争关系,这样,在最理想的情况下,ConcurrentHashMap 可以最高同时支持 Segment 数量大小的写操作

3. JDK1.8 中 CurrentHashMap

3.1. CurrentHashMap 的结构

JDK1.8的currentHashMap结构与 JDK1.8的HashMap的实现方式没什么区别,都采用了数组+链表+红黑树的实现方式。

JDK1.7:HashMap 采用数组 + 链表
JDK1.8:HashMap 采用数组 + 链表 + 红黑树

JDK1.8 为什么采用多加个红黑树? 因为JDK1.7中,当 HashMap 中的数据越来越多时,产生 hash 碰撞的数据也会越来越多,这会导致在一条链表中的数据越来越多,从而导致获取数据时间太长。

而JDK1.8会在链表达到一定长度时,会将链表转为红黑树结构,如下图所示
CurrentHashMap 为什么线程安全

3.2 如何保证线程安全

JDK8中彻底放弃了Segment转而采用的是Node,其设计思想也不再是JDK1.7中的分段锁思想。

取而代之的是采用Node + CAS + Synchronized来保证并发安全进行实现

3.2.1. Node

Node:保存key,value及key的hash值的数据结构。其中value和next都用volatile修饰,保证并发的可见性。

class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    //... 省略部分代码
}

3.2.2. put操作保证线程安全

JDK1.8的 CurrentHashMap 在 put 时保证线程安全使用到了 cas 操作以及 synchronized 关键字来保证线程安全。

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 得到 hash 值
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果数组"空",进行数组初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
 
        // 找该 hash 值对应的数组下标,得到第一个节点 f
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 如果数组该位置为空,
            // 用一次 CAS 操作将这个新值放入其中,
            // 如果 CAS 失败,那就是有并发操作,进到下一个循环
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // hash 等于 MOVED,扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
 
        else { // 到这里就是说,f 是该位置的头结点,而且不为空
 
            V oldVal = null;
            // 获取数组该位置的头结点的监视器锁
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) { // 头结点的 hash 值大于 0,说明是链表
                        binCount = 1;
                        // 遍历链表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 如果发现了"相等"的 key,判断是否要进行值覆盖,然后也就可以 break 了
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 到了链表的最末端,将这个新值放到链表的最后面
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) { // 红黑树
                        Node<K,V> p;
                        binCount = 2;
                        // 调用红黑树的插值方法插入新节点
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
 
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 
    addCount(1L, binCount);
    return null;
}
上一篇:Quest v31 Passthrough API无法透视的问题解决办法


下一篇:Java项目:(小程序)网上商城系统(weixin-java-mp+VUE+iview+bootstrap)