ConcurrentHashMap浅析

部分来自于网上,有雷同的望谅解

知识点预知

1:HashMap知识

2:Hashtable知识

3:volatie知识

4:reenantLock知识

通过以上四个知识点的基础再理解ConcurrentHashMap会容易很多

一,ConcurrentHashMap数据结构

 1:与HashTable的数据结构对比


                                                          ConcurrentHashMap浅析 ConcurrentHashMap浅析

左边是hashtable,右边是ConcurrentHashMap,hashtable的实现方式锁整个表,而ConcurrentHashMap的实现方式是锁桶(或者段)

2:ConcurrentHashMap示意图

                                                                 ConcurrentHashMap浅析 

二,ConcurrentHashMap的数据成员、构造方法、常用方法

 

1、segmentMask和segmentShift

final   int  segmentMask; 

final   int  segmentShift; 

这两个字段主要用于定位段,即就是通过key的hash值在使用这两个属性查找index,

2、segments

final  Segment<K,V>[] segments;  

继续看 Segment的数据成员

static final class Segment< K, V>  extends ReentrantLock  implements Serializable {

transient volatile int  count

transient volatile HashEntry< K,  V>[]  table;

final float  loadFactor;

transient int  threshold;

transient int  modCount;

}

2.1、count

     

transient volatile int  count

     是用来统计该段数据的个数,它是volatile修饰的,用来协调修改和读取操作,以保证读取操作能够读到几乎最新的count修改值。协调方式是这样的,每次修改操作做了结构上的改变,如增加/删除节点(修改节点的值不算结构上的改变),

都要写count值,每次读取操作开始都要读取count的值。这利用了Java 5中对volatile语义的增强, 对同一个volatile变量的写和读存在happens-before关系

2.2、modCount

统计段结构改变的次数, 主要是为了检测对多个段进行遍历过程中某个段是否发生改变,在讲述跨段操作时会还会细说。

2.3、threashold

用来表示需要进行rehash的界限值,如果HashEntry元素的个数超过该值时,会出发table再散列rehash

2.4、table

transient volatile HashEntry< K,  V>[]  table;

table数组存储段中节点,每个数组元素是个hash链, 用HashEntry表示。table也是volatile,这使得能够读取到最新的table值而不需要同步

2.5、loadFactor

final float  loadFactor;

表示负载因子。

另外看看HashEntry的数据成员:

static final class HashEntry< K,  V> { 
     final Object  key; 
     final int  hash; 
     volatile Object  value;

     final HashEntry< K,  V>  next;

}

  

用来封装散列映射表中的键值对。在 HashEntry 类中,key,hash 和 next 域都被声明为 final 型,value 域被声明为 volatile 型。

在 ConcurrentHashMap 中,在散列时如果产生“碰撞”,将采用“分离链接法”来处理“碰撞”:把“碰撞”的 HashEntry 对象链接成一个链表。由于 HashEntry 的 next 域为 final 型,所以新节点只能在链表的表头处插入。

( 由于只能在表头插入,所以链表中节点的顺序和插入的顺序相反)                                                                            

下图是依次插入 ABC 三个 HashEntry 节点后,Segment 的结构示意图:

                                                                               ConcurrentHashMap浅析ConcurrentHashMap浅析

3.无参构造方法

   

public ConcurrentHashMap() { 
     this( DEFAULT_INITIAL_CAPACITY,  DEFAULT_LOAD_FACTOR,  DEFAULT_CONCURRENCY_LEVEL); 
}

创建一个带有默认初始容量 (16)、默认加载因子 (0.75) 和 默认并发级别 (16) 的空散列映射表

4.带有三个参数的构造方法

(创建一个带有指定初始容量、加载因子和并发级别的新的空映射)

public ConcurrentHashMap( 
         int initialCapacity,  float loadFactor, 
         int concurrencyLevel) { 
     if (!(loadFactor >  0) || initialCapacity <  0 || concurrencyLevel <=  0) { 
         throw new IllegalArgumentException(); 
    } 

     if (concurrencyLevel >  MAX_SEGMENTS) { 
        concurrencyLevel =  MAX_SEGMENTS; 
    } 

     // 寻找最佳匹配参数(不小于给定参数的最接近的 2 次幂)。

     int sshift =  0; 
     int ssize =  1; 
     while (ssize < concurrencyLevel) { 
        ++ sshift; 
        ssize <<=  1; 
    }

     segmentShift =  32 - sshift;// 偏移量值

     segmentMask = ssize -  1;// 掩码值

     this. segments = Segment.newArray(ssize); // 创建数组


     if (initialCapacity >  MAXIMUM_CAPACITY) { 
        initialCapacity =  MAXIMUM_CAPACITY; 
    } 
     int c = initialCapacity / ssize; 
     if (c * ssize < initialCapacity) { 
        ++ c; 
    } 
     int cap =  1; 
     while (cap < c) { 
        cap <<=  1; 
    }

    // 依次遍历每个数组元素

     for ( int i =  0; i <  this. segments. length; ++ i) {

        // 初始化每个数组元素引用的 Segment 对象       

       this. segments[i] =  new Segment< K,  V>(cap, loadFactor);

    } 
}

参数 concurrencyLevel表示并发级别,这个值用来确定Segment的个数,Segment的个数是大于等于concurrencyLevel的第一个2的n次方的数。

比如,如果concurrencyLevel为12,13,14,15,16这些数,则Segment的数目为16(2的4次方)。 默认值为static final int DEFAULT_CONCURRENCY_LEVEL = 16;。

理想情况下ConcurrentHashMap的真正的并发访问量能够达到concurrencyLevel, 因为有concurrencyLevel个Segment,假如有concurrencyLevel个线程需要访问Map,

并且需要访问的数据都恰好分别落在不同的Segment中, 则这些线程能够无竞争地*访问(因为他们不需要竞争同一把锁),达到同时访问的效果。

5.put方法

public  V put( K key,  V value) {

     if (value ==  null) {//中不允许用 null 作为映射值

         throw new NullPointerException(); 
    }

     int hash = hashOf(key);// 计算键对应的散列码

     return segmentFor(hash).put(key, hash, value,  false);// 根据散列码找到对应的 segment

}

使用 key 的散列码来得到 segments 数组中对应的 Segment

final Segment< K,  V> segmentFor( int hash) { 
     return  segments[hash >>>  segmentShift &  segmentMask]; 
}

最终的put操作方法:

V put( K key,  int hash,  V value,  boolean onlyIfAbsent) {

    lock();// 加锁,这里是锁定某个 Segment 对象而非整个 ConcurrentHashMap

     try {

         int c =  count;

         if (c ++ >  threshold) { // 如果超过再散列的阈值

             int reduced = rehash();// 执行再散列,table 数组的长度将扩充一倍

             if (reduced >  0) { 
                 count = (c -= reduced) -  1;  // write-volatile
            } 
        } 

        HashEntry< K,  V>[] tab =  table;

        // 把散列码值与 table 数组的长度减 1 的值相“与”  得到该散列码对应的 table 数组的下标值

         int index = hash & tab. length -  1;

        HashEntry< K,  V> first = tab[index];// 找到散列码对应的具体的那个桶

        HashEntry< K,  V> e = first;

         while (e !=  null && (e. hash != hash || !keyEq(key, e.key()))) {//找到是否存在相同的Key(hash码一样&&key值相等)

            e = e. next; 
        } 

         V oldValue; 
         if (e !=  null) { 
            oldValue = e.value(); 
             if (!onlyIfAbsent) { 
                e.setValue(value); 
            } 
        }  else { 
            oldValue =  null;

            ++  modCount;// 要添加新节点到链表中,所以 modCont 要加 1

            tab[index] = newHashEntry(key, hash, first, value);// 创建新节点,并添加到链表的头部

             count = c;  // write-volatile  写 count 变量

        } 
         return oldValue;

    }  finally {

        unlock();//解锁

    } 
}

再看看rehash方法

int rehash() { 
    HashEntry< K,  V>[] oldTable =  table; 
     int oldCapacity = oldTable. length; 
     if (oldCapacity >=  MAXIMUM_CAPACITY) { 
         return  0; 
    }  
    //数组长度扩大一倍
    HashEntry< K,  V>[] newTable = HashEntry.newArray(oldCapacity <<  1); 
     threshold = ( int) (newTable. length *  loadFactor); 
     int sizeMask = newTable. length -  1; 
     int reduce =  0; 
     for ( int i =  0; i < oldCapacity; i ++) {

         // 必须保证现有的map可以继续读,所以我们不能清空每个槽

        HashEntry< K,  V> e = oldTable[i]; 

         if (e !=  null) { 
            HashEntry< K,  V> next = e. next; 
             int idx = e. hash & sizeMask;

             // 如果只有单个节点

             if (next ==  null) { 
                newTable[idx] = e; 
            }  else {

                 // 先把链表末端的节点移到新的槽位,然后把其余的节点克隆到新的槽位 ,rehash之后idx相同的元素

                HashEntry< K,  V> lastRun = e; 
                 int lastIdx = idx;

                 for (HashEntry< K,  V> last = next; last !=  null; last = last. next) {

                     int k = last. hash & sizeMask;//计算出在新table中的hash值

                     if (k != lastIdx) { 
                        lastIdx = k; 
                        lastRun = last; 
                    } 
                } 
                newTable[lastIdx] = lastRun;

                 // 复制剩下的所有HashEntry

                 for (HashEntry< K,  V> p = e; p != lastRun; p = p. next) { 
                     // Skip GC'd weak references
                     K key = p.key(); 
                     if (key ==  null) { 
                        reduce ++; 
                         continue; 
                    } 
                     int k = p. hash & sizeMask; 
                    HashEntry< K,  V> n = newTable[k]; 
                    newTable[k] = newHashEntry(key, p. hash, n, p.value()); 
                } 
            } 
        } 
    } 
     table = newTable; 
     return reduce; 
}

可以看到rehash()方法就是把table数组扩容一倍,再把原来的引用计算出在新数组的位置e.hash & sizeMask ,然后放过去,原来旧的table数组就交给垃圾回收

 注意:这里的加锁操作是针对(键的 hash 值对应的)某个具体的 Segment,锁定的是该 Segment 而不是整个 ConcurrentHashMap。 因为插入键 / 值对操作只是在这个 Segment 包含的某个桶中完成,不需要锁定整个ConcurrentHashMap。此时,其他写线程对另外 15 个Segment 的加锁并不会因为当前线程对这个 Segment 的加锁而阻塞。同时,所有读线程几乎不会因本线程的加锁而阻塞(除非读线程刚好读到这个 Segment 中某个 HashEntry 的 value 域的值为 null,此时需要加锁后重新读取该值)

相比较于 HashTable 和由同步包装器包装的 HashMap每次只能有一个线程执行读或写操作, ConcurrentHashMap 在并发访问性能上有了质的提高。在理想状态下,ConcurrentHashMap 可以支持 16 个线程执行并发写操作

(如果并发级别设置为 16),及任意数量线程的读操作。

6.get方法

segmentFor这个函数用于确定操作应该在哪一个segment中进行,几乎对ConcurrentHashMap的所有操作都需要用到这个函数,算法跟put一样,这里就不说了。

判断count,因为实际上put、remove等操作也会更新count的值,所以当竞争发生的时候,volatile的语义可以保证写操作在读操作之前,

也就保证了写操作对后续的读操作都是可见的,这样后面get的后续操作就可以拿到完整的元素内容。

V get(Object key,  int hash) { 
     if ( count !=  0) {  // read-volatile
        HashEntry< K,  V> e = getFirst(hash);

         while (e !=  null) {

             if (e. hash == hash && keyEq(key, e.key())) {//hash值和key值同事相等

                 V opaque = e.value(); 
                 if (opaque !=  null) { 
                     return opaque; 
                }

                 return readValueUnderLock(e);  // 再次检查,防止其他线程在修value

            } 
            e = e. next; 
        } 
    } 
     return null; 
}

通过hash值来算出HashEntry的index。然后得到对应的HashEntry

HashEntry< K,  V> getFirst( int hash) {

    HashEntry< K,  V>[] tab =  table;

     return tab[hash & tab. length -  1];

}

如果key对应的value为null时,而此时如果另外一个线程正在改变节点值,这样会导致数据不一致,所以给e上锁再读一遍,以保证得到的是正确的值

V readValueUnderLock(HashEntry< K,  V> e) { 
    lock(); 
     try { 
         return e.value(); 
    }  finally { 
        unlock(); 
    } 
}

7.remove 方法

  remove操作的前面一部分和前面的get和put操作一样,整个操作是先定位到段,然后委托给段的remove操作。当多个删除操作并发进行时,只要它们所在的段不相同,它们就可以同时进行。下面是Segment的remove方法实现:

V remove(Object key,  int hash, Object value,  boolean refRemove) {

    lock();//上锁

     try { 
         int c =  count -  1; 
        HashEntry< K, V>[] tab =  table; 
         int index = hash & tab. length -  1;//计算index

        HashEntry< K, V> first = tab[index];//获取要删除的元素所在的HashEntry

        HashEntry< K, V> e = first;

         while (e !=  null && key != e. key && 
                (refRemove || hash != e. hash || !keyEq(key, e.key()))) {//遍历找到要删除 
            e = e. next; 
        }

        //以上就是定位到要删除的节点e

        //接下来,如果不存在这个节点就直接返回null,否则就要将e前面的结点复制一遍,尾结点指向e的下一个结点。e后面的结点不需要复制,它们可以重用。

        V oldValue =  null; 
         if (e !=  null) { 
            V v = e.value(); 
             if (value ==  null || value.equals(v)) {

                oldValue = v;           

                ++  modCount; 
                HashEntry< K, V> newFirst = e. next; 
                 for (HashEntry< K, V> p = first; p != e; p = p. next) { 
                     K pKey = p.key(); 
                     if (pKey ==  null) {  // Skip GC'd keys
                        c --; 
                         continue; 
                    } 

                    newFirst = newHashEntry( 
                            pKey, p. hash, newFirst, p.value()); 
                } 
                tab[index] = newFirst; 
                 count = c;  // write-volatile
            } 
        } 
         return oldValue; 
    }  finally { 
        unlock(); 
    } 
}

首先remove操作也是确定需要删除的元素的位置,不过这里删除元素的方法不是简单地把待删除元素的前面的一个元素的next指向后面一个就完事了, 我们之前已经说过HashEntry中的next是final的,一经赋值以后就不可修改,在定位到待删除元素的位置以后,程序就将待删除元素前面的那一些元素全部复制一遍, 然后再一个一个重新接到链表上去,看一下下面这一幅图来了解这个过程: 假设写线程执行 remove 操作,

下面是个示意图

 删除元素之前:

ConcurrentHashMap浅析ConcurrentHashMap浅析

删除元素之后:

ConcurrentHashMap浅析ConcurrentHashMap浅析

第二个图其实有点问题,复制的结点中应该是值为2的结点在前面,值为1的结点在后面,也就是刚好和原来结点顺序相反。

整个remove实现并不复杂,但是需要注意如下几点。

第一,当要删除的结点存在时,删除的最后一步操作要将count的值减一。这必须是最后一步操作,否则读取操作可能看不到之前对段所做的结构性修改。

第二,remove执行的开始就将table赋给一个局部变量tab,这是因为table是 volatile变量,读写volatile变量的开销很大。编译器也不能对volatile变量的读写做任何优化,

          直接多次访问非volatile实例变量没有多大影响,编译器会做相应优化。

注意:它们在新链表中的链接顺序被反转了。 在执行 remove 操作时,原始链表并没有被修改,也就是说:读线程不会受同时执行 remove 操作的并发写线程的干扰。 综合上面的分析我们可以看出,写线程对某个链表的结构性修改不会影响其他的并发读线程对这个链表的遍历访问。

关键点总结:

1: 分段锁,segement继承ReentrantLock

2: 巧妙使用volatie、final

3: 两种计算Index方式(一个是segment,一个是HashEntry)

4: Put,remove会对当前这个segment上锁,对其他的segement不会影响

5: get一般情况没锁,只有当get的value=null时候,会上锁


size

public int size() { 
     // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
     final Segment< K, V>[] segments =  this. segments; 
     int size; 
     boolean overflow;  // true if size overflows 32 bits
     long sum;          // sum of modCounts
     long last =  0L;    // previous sum
     int retries = - 1;  // first iteration isn't retry
     try {

         for (;;) {

             if (retries++ ==  RETRIES_BEFORE_LOCK) { 
                 for ( int j =  0; j < segments. length; ++j) 
                    ensureSegment(j).lock();  // force creation
            } 
            sum =  0L; 
            size =  0; 
            overflow =  false; 
             for ( int j =  0; j < segments. length; ++j) { 
                Segment< K, V> seg = segmentAt(segments, j); 
                 if (seg !=  null) { 
                    sum += seg. modCount; 
                     int c = seg. count; 
                     if (c <  0 || (size += c) <  0) 
                        overflow =  true; 
                } 
            } 
             if (sum == last) 
                 break; 
            last = sum; 
        } 
    }  finally { 
         if (retries >  RETRIES_BEFORE_LOCK) { 
             for ( int j =  0; j < segments. length; ++j) 
                segmentAt(segments, j).unlock(); 
        } 
    } 
     return overflow ? Integer. MAX_VALUE : size;

}

首先就是判断重试次数是否等于2,从-1开始的

     第1种情况:等于2就把map中的每个segment上锁,然后累加每个锁中的count,累加完成了再解锁,最后返回结果

     第2种情况:不等于的情况,第一次先不上锁遍历累加modCount,第二次不上锁遍历累加modCount,如果与第一次累加的结果相等则返回。如果不等,那第三次遍历累加,累加结果相等跟前面一样。

        如果第二次和第三次累加结果不想等,则此时重试次数已经等于2了,所以又回到第一种情况

     为什么modCount来累加判断呢?因为ConcurrentHashMap在put、remove和clean方法里操作元素前都会将变量modCount进行加1,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化。


上一篇:java集合


下一篇:基于Promethues与Grafana的Greenplum分布式数据库监控的实现