ConcurrentHashMap

ConcurrentHashMap

高并发下的HashMap

举一个例子 : 扩容Resize:

  • HashMap的容量是有限的。当经过多次元素插入,使得HashMap达到一定饱和度时,Key映射位置发生冲突的几率会逐渐提高。
  • 这时候,HashMap需要扩展它的长度,也就是进行Resize
  • 扩容条件:HashMap.Size(元素个数) >= Capacity(当前容量) * LoadFactor(扩容因子)
  • 步骤:
    1. 扩容:创建一个新的Entry数组,长度是原来的两倍
    2. ReHash:重新对原来的Entry的key进行Hash(index = HashCode(Key) & (Length- 1))在jdk1.8时进行了优化,原来的node要么还在原来的位置,要不就会加上扩容的长度
  • ReHash在并发的情况下可能会形成链表环。
  • 漫画:高并发下的HashMap

高并发下HashMap的解决方法

  • HashTable
  • Collections.synchronizedMap
  • ConcurrentHashMap(分段锁 -> CAS + synchronized)

ConcurrentHashMap:


JDK1.7

  • Segment:

    • Segment本身就相当于一个HashMap对象。同HashMap一样,Segment包含一个HashEntry数组,数组中的每一个HashEntry既是一个键值对,也是一个链表的头节点。

    • ConcurrentHashMap

    • 可以说,ConcurrentHashMap是一个二级哈希表。在一个总的哈希表下面,有若干个子哈希表

    • ConcurrentHashMap当中每个Segment各自持有一把锁。在保证线程安全的同时降低了锁的粒度,让并发操作效率更高。

      • 不同Segment的写入是可以并发执行的。
      • 同一Segment的写和读是可以并发执行的。
      • Segment的写入是需要上锁的,因此对同一Segment的并发写入会被阻塞
  • Get:

    1.为输入的Key做Hash运算,得到hash值。

    2.通过hash值,定位到对应的Segment对象

    3.再次通过hash值,定位到Segment当中数组的具体位置。

  • Put:

    1.为输入的Key做Hash运算,得到hash值。

    2.通过hash值,定位到对应的Segment对象 -- 第一次定位 到 Segment

    3.获取可重入锁

    4.再次通过hash值,定位到Segment当中数组的具体位置。-- 第二次定位 到 具体元素

    5.插入或覆盖HashEntry对象。

    6.释放锁。

  • size:元素数量

    1.遍历所有的Segment。

    2.把Segment的元素数量累加起来。

    3.把Segment的修改次数累加起来。

    4.判断所有Segment的总修改次数是否大于上一次的总修改次数。如果大于,说明统计过程中有修改,重新统计最多三次),尝试次数+1;如果不是。说明没有修改,统计结束。

    5.如果尝试次数超过阈值,则对每一个Segment加锁,再重新统计。 -- 升级锁

    6.再次判断所有Segment的总修改次数是否大于上一次的总修改次数。由于已经加锁,次数一定和上次相等。

    7.释放锁,统计结束。


JDK1.8

  • 从JDK1.7版本的ReentrantLock(重入锁)+Segment+HashEntry,到JDK1.8版本中synchronized+CAS+HashEntry+红黑树

  • 优化

    • JDK1.8的实现降低锁的粒度,JDK1.7版本锁的粒度是基于Segment的,包含多个HashEntry,而JDK1.8锁的粒度就是HashEntry(首节点)
    • JDK1.8版本的数据结构变得更加简单,使得操作也更加清晰流畅,因为已经使用synchronized来进行同步,所以不需要分段锁的概念,也就不需要Segment这种数据结构了,由于粒度的降低,实现的复杂度也增加了
    • JDK1.8使用红黑树来优化链表,基于长度很长的链表的遍历是一个很漫长的过程,而红黑树的遍历效率是很快的,代替一定阈值的链表,这样形成一个最佳拍档
    • JDK1.8为什么使用内置锁synchronized来代替重入锁ReentrantLock,我觉得有以下几点
      • 因为粒度降低了,在相对而言的低粒度加锁方式,synchronized并不比ReentrantLock差,在粗粒度加锁中ReentrantLock可能通过Condition来控制各个低粒度的边界,更加的灵活,而在低粒度中,Condition的优势就没有了
      • JVM的开发团队从来都没有放弃synchronized,而且基于JVM的synchronized优化空间更大,使用内嵌的关键字比使用API更加自然
      • 在大量的数据操作下,对于JVM的内存压力,基于API的ReentrantLock会开销更多的内存,虽然不是瓶颈,但是也是一个选择依据
  • put

    1. 步骤:对当前的table进行无条件自循环直到put成功

      1. 如果没有初始化就先调用initTable()方法来进行初始化过程

      2. 如果没有hash冲突就直接CAS插入 -- 两次哈希,减少哈希冲突

      3. 如果还在进行扩容操作就先进行扩容

      4. 如果存在hash冲突,就加锁来保证线程安全,这里有两种情况,一种是链表形式就直接遍历到尾端插入,一种是红黑树就按照红黑树结构插入,

      5. 最后一个如果该链表的数量大于阈值8,就要先转换成黑红树的结构,break再一次进入循环;

      6. 如果添加成功就调用addCount()方法统计size,并且检查是否需要扩容

          public V put(K key, V value) {
              return putVal(key, value, false);
          }
          /** Implementation for put and putIfAbsent */
          final V putVal(K key, V value, boolean onlyIfAbsent) {
              if (key == null || value == null) throw new NullPointerException();
              int hash = spread(key.hashCode()); //两次hash,减少hash冲突,可以均匀分布
              int binCount = 0;
              for (Node<K,V>[] tab = table;;) { //对这个table进行迭代
                  Node<K,V> f; int n, i, fh;
                  //这里就是上面构造方法没有进行初始化,在这里进行判断,为null就调用initTable进行初始化,属于懒汉模式初始化
                  if (tab == null || (n = tab.length) == 0)
                      tab = initTable();
                  else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果i位置没有数据,就直接无锁插入
                      if (casTabAt(tab, i, null,
                                   new Node<K,V>(hash, key, value, null)))
                          break;                   // no lock when adding to empty bin
                  }
                  else if ((fh = f.hash) == MOVED)//如果在进行扩容,则先进行扩容操作
                      tab = helpTransfer(tab, f);
                  else {
                      V oldVal = null;
                      //如果以上条件都不满足,那就要进行加锁操作,也就是存在hash冲突,锁住链表或者红黑树的头结点
                      synchronized (f) {
                          if (tabAt(tab, i) == f) {
                              if (fh >= 0) { //表示该节点是链表结构
                                  binCount = 1;
                                  for (Node<K,V> e = f;; ++binCount) {
                                      K ek;
                                      //这里涉及到相同的key进行put就会覆盖原先的value
                                      if (e.hash == hash &&
                                          ((ek = e.key) == key ||
                                           (ek != null && key.equals(ek)))) {
                                          oldVal = e.val;
                                          if (!onlyIfAbsent)
                                              e.val = value;
                                          break;
                                      }
                                      Node<K,V> pred = e;
                                      if ((e = e.next) == null) {  //插入链表尾部
                                          pred.next = new Node<K,V>(hash, key,
                                                                    value, null);
                                          break;
                                      }
                                  }
                              }
                              else if (f instanceof TreeBin) {//红黑树结构
                                  Node<K,V> p;
                                  binCount = 2;
                                  //红黑树结构旋转插入
                                  if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                                 value)) != null) {
                                      oldVal = p.val;
                                      if (!onlyIfAbsent)
                                          p.val = value;
                                  }
                              }
                          }
                      }
                      if (binCount != 0) { //如果链表的长度大于8时就会进行红黑树的转换
                          if (binCount >= TREEIFY_THRESHOLD)
                              treeifyBin(tab, i);
                          if (oldVal != null)
                              return oldVal;
                          break;
                      }
                  }
              }
              addCount(1L, binCount);//统计size,并且检查是否需要扩容
              return null;
          }
    
      - initTable() -- 初始化
      - helpTransfer() -- 多线程扩容(transfer():扩容)
      - treeifyBin() -- 链表转红黑树
      - addCount() -- 计算size 和 是否进行扩容
    
  • get:

    1. 计算hash值,定位到该table索引位置,如果是首节点符合就返回
    2. 如果遇到扩容的时候,会调用标志正在扩容节点ForwardingNode的find方法,查找该节点,匹配就返回
    3. 以上都不符合的话,就往下遍历节点,匹配就返回,否则最后就返回null
    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode()); //计算两次hash
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {//读取首节点的Node元素
            if ((eh = e.hash) == h) { //如果该节点就是首节点就返回
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            //hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到nextTable来
            //查找,查找到就返回
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {//既不是首节点也不是ForwardingNode,那就往下遍历
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
    
  • size:

    在put操作时,就已经调用了addCount,将size计算好了

    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a; //变化的数量
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
    
  • 扩容

    1. 单线程新建nextTable,新容量一般为原table容量的两倍。
    2. 每个线程想增/删元素时,如果访问的桶是ForwardingNode节点,则表明当前正处于扩容状态,协助一起扩容完成后再完成相应的数据更改操作。
    3. 扩容时将原table的所有桶倒序分配,每个线程每次最小分配16个桶,防止资源竞争导致的效率下降。单个桶内元素的迁移是加锁的,但桶范围处理分配可以多线程,在没有迁移完成所有桶之前每个线程需要重复获取迁移桶范围,直至所有桶迁移完成。
    4. 一个旧桶内的数据迁移完成但不是所有桶都迁移完成时,查询数据委托给ForwardingNode结点查询nextTable完成(这个后面看find()分析)。
    5. 迁移过程中sizeCtl用于记录参与扩容线程的数量,全部迁移完成后sizeCtl更新为新table容量的0.75倍。

ConcurrentHashMap

上一篇:在vue中使用 Echarts组件化 父子组件--> 动态刷新


下一篇:磁盘挂载