【Java集合】ConcurrentHashMap(1.7)源码解析

ConcurrentHashMap可以视为线程安全版的HashMap,以下简称CHM,在了解CHM之前最好先对HashMap有一定的了解:

【HashMap(1.7)源码解析】

一、CHM起步

1. HashMap与HashTable存在的问题

  • HashMap:HashMap在多线程环境下可能会造成循环节点,导致CPU利用率接近100%,所以在高并发的环境下不能使用HashMap;

  • HashTable:HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法时,其他线程访问HashTable的同步方法时,可能会进入阻塞或轮询状态。如线程1使用put进行添加元素,线程2不但不能使用put方法添加元素,并且也不能使用get方法来获取元素,所以竞争越激烈效率越低。

    public synchronized V put(K key, V value) {
        // ...
    }
    

2. CHM数据结构

CHM和HashMap的思想是差不多的,但是由于需要支持并发操作,所以实现要更为复杂些。整个CHM由多个Segment组成,很多文章将其描述为分段锁;可以简单理解,CHM是一个Segment数组,每个Segment指向了HashMap。而Segment本身是通过继承ReentrantLock来进行加锁的,所以每次操作只需要锁住一个Segment而不用去锁住整个CHM对象,这样也就能保证在并发操作时的安全与高效,其数据结构大概如下:
【Java集合】ConcurrentHashMap(1.7)源码解析

记住这个数据结构,接下来我们就来对源码进行分析。

二、CHM源码解析

1. 类结构定义与成员变量

CHM 继承了AbstractMap并且实现了ConcurrentMap接口,代码如下:

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable 

与HashMap相比较,CHM中添加了两个属性用于定位Segment,分别是segmentMask 和 segmentShift;此外,不同与HashMap中直接定义一个Entry数组的table,CHM底层结构则是一个Segment数组,源代码如下:

/**
  * Mask value for indexing into segments. The upper bits of a
  * key's hash code are used to choose the segment.
  * 用于定位段,大小等于segments数组的大小减 1,是不可变的
  */
final int segmentMask;

/**
  * Shift value for indexing within segments.
  * 用于定位段,大小等于32(hash值的位数)减去对segments的大小取以2为底的对数值,是不可变的
  */
final int segmentShift;

/**
  * The segments, each of which is a specialized hash table
  */
final Segment<K,V>[] segments;

2. Segement段的定义

Segment类继承了ReenterLock类,从而来完成锁相关的操作。每个Segment对象用来维护它的成员对象table(即HashEntry数组)中的若干个桶;

count变量用来记录table数组中所包含的HashEntry对象的个数。之所以将这个count定义在Segment中而不是整个CHM上,是因为这样在更新count的时候不需要去锁定整个CHM,而只是锁定当前的Segment对象即可,大大提高了效率。同时count变量还使用的volatile修饰,这使得只要当前线程的count发生改变,该操作就会对其他线程立即可见;

modCount用于统计段进行操作的的次数,该变量是为了检测对多个段进行遍历过程中某个段是否发生改变;

threashold用来表示Segment段内的table需要进行扩容与重哈希的阈值;

loadFactor表示Segment段的负载因子,其值等同于CHM的负载因子的值;

table是一个典型的链表数组,而且也是volatile的,这使得对table的任何更新对其它线程也都是立即可见的;

Segment类的源码定义如下所示:

static final class Segment<K,V> extends ReentrantLock implements Serializable {

    /**
     * The number of elements in this segment's region.
     * Segment中元素的数量
     */
    transient volatile int count;

    /**
     * Number of updates that alter the size of the table. This is
     * used during bulk-read methods to make sure they see a
     * consistent snapshot: If modCounts change during a traversal
     * of segments computing size or checking containsValue, then
     * we might have an inconsistent view of state so (usually)
     * must retry.
     * 对count的大小造成影响的操作的次数(比如put或者remove操作)
     */
    transient int modCount;

    /**
     * The table is rehashed when its size exceeds this threshold.
     * (The value of this field is always <tt>(int)(capacity *
     * loadFactor)</tt>.)
     * 阈值,段中元素的数量超过这个值就会对Segment进行扩容
     */
    transient int threshold;

    /**
     * The per-segment table.
     * 链表数组
     */
    transient volatile HashEntry<K,V>[] table;

    /**
     * The load factor for the hash table.  Even though this value
     * is same for all segments, it is replicated to avoid needing
     * links to outer object.
     * @serial
     * 段的负载因子,其值等同于ConcurrentHashMap的负载因子
     */
    final float loadFactor;

    ...
}

到这里我们可以知道,CHM的思想与HashTable有部分一致。HashTable在涉及到安全问题的操作方法时,就将整个对象加锁,从而避免了安全问题,却也导致了效率的低下。而CHM则利用Segment分段锁,每个Segment其实就是一个HashTable(这么说当然不严谨,就当比对吧),当有两个线程来操作同一个Segment中的数据时,就对Segment加锁,那么就只剩一个线程在访问当前的Segment段了,但是其实其他Segment并没有被加上锁,所以其他Segment段还是可以被访问,这样就大大地提升了效率。

3. HashEntry链表节点类

HashEntry与HashMap中的Entry类几乎相识,一样维护了key、value、hash、以及指向下一节点的next指针;

static final class HashEntry<K,V> {
    final K key;                  
    final int hash;
    volatile V value;
    final HashEntry<K,V> next;

    HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
        this.key = key;
        this.hash = hash;
        this.next = next;
        this.value = value;
    }

    @SuppressWarnings("unchecked")
    static final <K,V> HashEntry<K,V>[] newArray(int i) {
        return new HashEntry[i];
    }
}

不同于HashMap中的Entry的是,HashEntry中的key、hash、next域都被声明为final、而value则被volatile修饰,因此HashEntry对象几乎是不可变的,这也是CHM读操作并不需要加锁的一个重要原因;

next域被声明为final意味着不能从HashEntry链的中间或者尾部添加或删除新的节点,所有节点的修改操作都必须从头部开始;对于put操作,可以一律添加到HashEntry链的头部;而对于remove操作,这需要从中间删除一个节点,这个时候就无法进行了,只能将要删除的节点前面的所有节点都复制一遍(重新new一个),然后前面最后一个节点去指向要删除的节点的下一个节点;除此之外,由于value被volatile修饰,所以可以确保其被读线程所读到的值都是最新的值;

通过上述说明,我们可以知道CHM完全允许多个读操作并发进行,读操作并不需要加锁。

4. CHM构造函数

构造函数的工作主要就是判断传入的初始化容量,负载因子,并发级别是否符合要求;并发级别就是同时允许多少个修改线程对CHM对象进行操作,也就是需要多少个Segment,即声明Segment数组的长度;之后再通过初始化容量/segment数组的长度求出每个segment所对应的table的长度;最后初始化segments数组。代码如下所示:

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    // loadFactor不能小于等于0,初始容量不能小于0,并发级别不能小与等于0,并发级别就是声明要有多少个segment段
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();

    if (concurrencyLevel > MAX_SEGMENTS)              
        concurrencyLevel = MAX_SEGMENTS;

    // Find power-of-two sizes best matching arguments
    int sshift = 0;            // 大小为 lg(ssize) 
    // 段的数目,segments数组的大小(2的幂次方)
    int ssize = 1;
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 用于定位段
    segmentShift = 32 - sshift;
    // 用于定位段
    segmentMask = ssize - 1;
    // 创建segments数组
    this.segments = Segment.newArray(ssize);

    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    // 总的桶数/总的段数
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    // 每个Segment段所拥有的桶的数目(2的幂次方)
    int cap = 1;
    while (cap < c)
        cap <<= 1;

    // 初始化segments数组
    for (int i = 0; i < this.segments.length; ++i)
        this.segments[i] = new Segment<K,V>(cap, loadFactor);
}

5. CHM存储元素

我们先来看一下CHM的存储元素的过程,与读操作不一样,像put、remove这种修改操作是进行加锁操作的,具体看源码分析:

public V put(K key, V value) {
    // key值非空判断
    if (value == null)
        throw new NullPointerException();
    // 计算hash值
    int hash = hash(key.hashCode());
    // segmentFor(hash)计算要存储在哪个segment段中,然后再通过put操作添加到指定的segment中
    return segmentFor(hash).put(key, hash, value, false);
}

从上面源码我们可以看出,CHM不同于HashMap,它不允许key值为null。此外,我们可以看到,我们要存储的键值对首先需要被计算得出应该要存储在哪个segment中,然后再将put操作委托于特定的segment来进行,我们先来看看segmentFor()方法的源代码:

final Segment<K,V> segmentFor(int hash) {
    return segments[(hash >>> segmentShift) & segmentMask];
}

segmentFor( )方法根据传入的hash值无符号右移segmentShift位,然后和segmentMask进行与操作就可以定位到特定的段。这里与HashMap中的indexFor()函数有异曲同工之处。

Segment的数量(segments数组的长度)是2的n次方(参阅构造函数),那么segmentShift的值就是32-n(hash值的位数是32位),而 segmentMask 的值就是 2^n-1(写成二进制的形式就是n个1)。这一点操作可以实现键值对在segment数组中的较为均匀地发布,可以参考HashMap中的indexFor()方法;

计算完该键值对应该存储于哪个段中后,就调用这个段的 put( ) 方法来将键值对插到该段中,Segment的put( )方法的源码如下所示:

V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 上锁
    lock();
    try {
        int c = count;
        // 判断本次添加键值对后是否超过阈值,超过则调用rehash()方法
        if (c++ > threshold) // ensure capacity
            rehash();
        // table是Volatile修饰的
        HashEntry<K,V>[] tab = table;
        // 定位到段中特定的桶
        int index = hash & (tab.length - 1);
        // first指向桶中链表的表头
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;

        // 检查该桶中是否存在相同key的结点
        while (e != null && (e.hash != hash || !key.equals(e.key)))  
            e = e.next;

        V oldValue;
        if (e != null) {
            // 该桶中存在相同键值对
            oldValue = e.value;
            if (!onlyIfAbsent)
                 // 更新value值
                e.value = value;
        }else {
            // 该桶中不存在相同键值对
            oldValue = null;
            // 结构性修改,modCount加1
            ++modCount;
            // 创建HashEntry并将其链到表头
            tab[index] = new HashEntry<K,V>(key, hash, first, value);
            // write-volatile,count值的更新一定要放在最后一步(volatile变量)
            count = c;
        }
         // 返回旧值(该桶中不存在相同key的结点,则返回null)
        return oldValue;
    } finally {
        // 解锁
        unlock();
    }
}

从源码中可以看出,在Segment中的 put 操作是需要加锁的,而Segment本身继承了ReenterLock类,所以就是一个可重入的Lock锁对象,可以调用lock() 与 unlock()函数对同步代码段进行加锁与解锁。从而这里我们也可以证明,CHM中的加锁操作是针对于单个Segment段,而不是整个CHM。在实际运行过程中,CHM 可以支持 16 个线程执行并发写操作(concurrencyLevel默认设置为 16),以及任意数量的读线程。

接下来我们来看同步代码块中的代码,在键值对插入到Segment中,首先会去判断当前segment中的键值对数量+1是否超过了阈值,超过就进行扩容与rehash。否则继续往下运行,通过hash & (tab.length - 1)的操作定位当前键值对所对应的tab下标位置index,获取index处的HashEntry对象,之后进行循环判断,判断当前要插入的键值对是否存在于该桶中,存在就覆盖原来的value值,否则就插入新的值(头插法);

6. CHM容器扩容

上面的Segment中的put函数我们发现有如下代码:

if (c++ > threshold) // ensure capacity
    rehash();

这行代码就是检查本次插入会不会使得Segment段中的HashEntry对象的数量超过阈值,如果超过,那么就进行扩容与重哈希操作,接下来我们就来看看rehash( )函数的源代码:

void rehash() {
    // 扩容前的table
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    // 判断table是否已经达到最大长度,是就直接返回
    if (oldCapacity >= MAXIMUM_CAPACITY)
        return;

    // 新创建一个 newtable,其容量是原来的2倍
    HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);
    // 新的阈值
    threshold = (int)(newTable.length * loadFactor);
    // 用于定位桶的位置
    int sizeMask = newTable.length - 1;
    for (int i = 0; i < oldCapacity ; i++) {
        // 依次指向旧table中的每个桶的链表表头
        HashEntry<K,V> e = oldTable[i];  
        // 旧table的该桶中链表不为空
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 重哈希已定位到新桶
            int idx = e.hash & sizeMask;
            //  旧table的该桶中只有一个节点
            if (next == null)
                newTable[idx] = e;
            else {    
                // Reuse trailing consecutive sequence at same slot
                HashEntry<K,V> lastRun = e;
                int lastIdx = idx;
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    // 寻找k值相同的子链,该子链尾节点与父链的尾节点必须是同一个
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }

                // JDK直接将子链lastRun放到newTable[lastIdx]桶中
                newTable[lastIdx] = lastRun;

                // 对该子链之前的结点,JDK会挨个遍历并把它们复制到新桶中
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    int k = p.hash & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(p.key, p.hash,
                                                     n, p.value);
                }
            }
        }
    }
    // 将table指向新表
    table = newTable;
}

rehash()函数的执行流程如下:首先判断当前segment中维护的table长度是否已经达到了最大值(2的30次方),是就不再扩容,直接返回。否则就进行扩容操作,扩容的思路就是重新创建一个新的table数组,长度为原来的两倍。之后就是将原来table中的元素重新复制到新的table中,最终再将Segment中维护的table指向新的table。

7. CHM读取元素

CHM中读取元素的操作与插入元素的操作流程有一定的相似,都是需要先去定位当前键值对属于哪个Segment,代码如下:

public V get(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).get(key, hash);
}

确定当前键值对属于特定的Segment后,就调用该segment的get函数,函数代码如下:

V get(Object key, int hash) {
    // read-volatile,首先读 count 变量
    if (count != 0) {
        // 通过hash确定键值对在哪个桶,返回桶中链表头结点
        HashEntry<K,V> e = getFirst(hash);
        while (e != null) {
            // 查找链中是否存在指定Key的键值对
            if (e.hash == hash && key.equals(e.key)) {
                V v = e.value;
                // 如果读到value域不为 null,直接返回
                if (v != null)
                    return v;   
                // 如果读到value域为null,说明发生了重排序,加锁后重新读取
                return readValueUnderLock(e); // recheck
            }
            e = e.next;
        }
    }
    // 如果不存在,直接返回null
    return null;
}

这段代码相对简单,就是去判断当前键值对对应的桶下标,返回桶中的链表的头结点,循环判断键值对是否存在与当前链中,如果不存在就返回null,如果存在,并且读取到的值不为null,那么就返回读取到的值,如果读取到的值为null,那么说明发生了重排序,就需要进行加锁重读了,加锁重读的代码如下:

V readValueUnderLock(HashEntry<K,V> e) {
    lock();
    try {
        return e.value;
    } finally {
        unlock();
    }
}

通过存储元素的源代码我们呢知道CHM不同于HashMap,它是不允许key值为null的,同时也不允许value值为null。但是此处读取元素却出现键值对存在但value值为null的情况,对此JDK给出的解释是,这是由于初始化 HashEntry 时发生了指令重排序导致的,也就是在HashEntry初始化完成之前便返回了它的引用(有其他线程在操作这个Segment段)。这时,JDK给出的解决方式就是加锁重读,即上面的函数。

8. CHM跨段操作

上面介绍的函数都是在同一个Segment中进行操作的,但是CHM中也会有一些函数涉及到了多个Segment,这里称为跨段操作,像是size()containsValue(V value)需要对整个CHM进行扫描的函数。这里我们以size()函数为示例,介绍下多线程环境下的跨段操作,size()的源代码如下:

public int size() {
    final Segment<K,V>[] segments = this.segments;
    long sum = 0;
    long check = 0;
    int[] mc = new int[segments.length];
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
        check = 0;
        sum = 0;
        int mcsum = 0;
        for (int i = 0; i < segments.length; ++i) {
            sum += segments[i].count;   
            // 在统计size时记录modCount用以判断
            mcsum += mc[i] = segments[i].modCount;
        }
        if (mcsum != 0) {
            for (int i = 0; i < segments.length; ++i) {
                check += segments[i].count;
                // 统计size后比较各段的modCount是否发生变化,发生变化代表被其他线程操作过,重新循环
                if (mc[i] != segments[i].modCount)
                    check = -1; // force retry
                    break;
                }
            }
        }
        // 如果统计size前后各段的modCount没变,且两次得到的总数一致,说明没有线程进行过操作,直接返回
        if (check == sum)
            break;
    }
    if (check != sum) { // Resort to locking all segments
        sum = 0;
        // 对所有segment进行加锁
        for (int i = 0; i < segments.length; ++i)
            segments[i].lock();
        // 统计
        for (int i = 0; i < segments.length; ++i)
            sum += segments[i].count;
        // 对所有segment进行解锁
        for (int i = 0; i < segments.length; ++i)
            segments[i].unlock();
    }
    if (sum > Integer.MAX_VALUE)
        return Integer.MAX_VALUE;
    else
        return (int)sum;
}

size( )的运行过程如下:首先先在没有锁的情况下对所有Segment的count进行求和操作,这种求和操作最多执行RETRIES_BEFORE_LOCK次(默认是两次):在没有达到RETRIES_BEFORE_LOCK之前,求和操作会不断尝试执行(这是因为遍历过程中可能有其它线程正在对已经遍历过的段进行结构性更新),除非没有其他线程对CHM对象进行结构性操作(通过modCount判断);在超过RETRIES_BEFORE_LOCK之后,如果还不成功就在持有所有段锁的情况下再对所有段大小求和。如果执行RETRIES_BEFORE_LOCK次后还是无法得出准确的count值,那么再采用加锁的方式来统计所有Segment的大小。

上一篇:创建chm格式文件


下一篇:拆包粘包问题的解决方案