JUC:ConcurrentHashMap

ConcurrentHashMap

看前

本博客只是为了记录,学习流程,学习源码其大部分看的是ConcurrentHashMap 1.8 源码分析这位大佬的博客,写的十分详细,图文并茂非常易懂,十分推荐。我这篇博客只是在他的基础上加了点自己的理解(肯定有不少理解不到位的地方)以及查询了一些没讲到的函数。

介绍

源码: java.util.concurrent.ConcurrentHashMap
HashMap有线程安全问题,导致出现链表死循环的情况。HashTable和ConcurrentHashMap没有线程安全问题,但二者上锁的方式不同,一个是HashTable全局一把锁,使用的是synchronized,ConcurrentHashMap是分段锁只锁住一个SegMent对象,其Segment对象继承于ReentrantLock。
JDK7: segment+Hashentry+reentrantlock
JDK8: synchronized+CAS
底层数据结构JDK7是segment数组+hashentry链表,JDK8是Node,treebin(红黑树),导致了二者的遍历时间复杂度有时不同。

源码分析

有了前面的HashMap基础,我们在讲解ConcurrentHashMap时只讲解二者不同之处。

主要属性

private static final int MAXIMUM_CAPACITY = 1 << 30;
private static final int DEFAULT_CAPACITY = 16;
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
static final int MOVED     = -1; // 表示正在转移
static final int TREEBIN   = -2; // 表示已经转换成树
static final int RESERVED  = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
transient volatile Node<K,V>[] table;//默认没初始化的数组,用来保存元素
private transient volatile Node<K,V>[] nextTable;//转移的时候用的数组
private transient volatile int sizeCtl;

我们可以发现相对于HashMap新加了一个sizeCtl,是一个非常重要的变量。它的意义可以解释为。

  • sizeCtl :
    • 默认为0,用来控制table的初始化和扩容操作
    • -1 代表table正在初始化
    • -N 表示有N-1个线程正在进行扩容操作
    • 其余情况:
      1. 如果table未初始化,表示table需要初始化的大小。
      2. 如果table初始化完成,表示table的扩容阈值,默认是table大小的0.75倍

希望在看源码前对基本的构成心中有大概的轮廓,看代码才会更加轻松
JUC:ConcurrentHashMap

内部类

Node

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }

        public final K getKey()       { return key; }
        public final V getValue()     { return val; }
        public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
        public final String toString(){ return key + "=" + val; }
        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }

        public final boolean equals(Object o) {
            Object k, v, u; Map.Entry<?,?> e;
            return ((o instanceof Map.Entry) &&
                    (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                    (v = e.getValue()) != null &&
                    (k == key || k.equals(key)) &&
                    (v == (u = val) || v.equals(u)));
        }

        /**
         * Virtualized support for map.get(); overridden in subclasses.
         */
        Node<K,V> find(int h, Object k) {
            Node<K,V> e = this;
            if (k != null) {
                do {
                    K ek;
                    if (e.hash == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                } while ((e = e.next) != null);
            }
            return null;
        }
    }

相较于HashMap的Node实现,ConcurrentHashMap的Node需要在子类中实现setValue,并且实现了该节点往后节点值得寻找方法find。

TreeNode

static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  // red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;    // needed to unlink next upon deletion
        boolean red;

        TreeNode(int hash, K key, V val, Node<K,V> next,
                 TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }

        Node<K,V> find(int h, Object k) {
            return findTreeNode(h, k, null);
        }

        /**
         * Returns the TreeNode (or null if not found) for the given key
         * starting at given root.
         */
        final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
            if (k != null) {
                TreeNode<K,V> p = this;
                do  {
                    int ph, dir; K pk; TreeNode<K,V> q;
                    TreeNode<K,V> pl = p.left, pr = p.right;
                    if ((ph = p.hash) > h)
                        p = pl;
                    else if (ph < h)
                        p = pr;
                    else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
                        return p;
                    else if (pl == null)
                        p = pr;
                    else if (pr == null)
                        p = pl;
                    else if ((kc != null ||
                              (kc = comparableClassFor(k)) != null) &&
                             (dir = compareComparables(kc, k, pk)) != 0)
                        p = (dir < 0) ? pl : pr;
                    else if ((q = pr.findTreeNode(h, k, kc)) != null)
                        return q;
                    else
                        p = pl;
                } while (p != null);
            }
            return null;
        }
    }

与HashMap不同,HashMap的TreeNode继承于LinkedHashMap.Entry<K,V>,并且使用了上百行代码对其进行了完整实现。但在ConcurrentHashMap中的TreeNode只是继承于Node,并且重写了find方法。对于树的完整实现放到了哪里呢?在ConcurrentHashMap中放到了TreeBin中。

TreeBin

这个类并不负责包装用户的key、value信息,而是包装的很多TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象,这是与HashMap的区别。另外这个类还带有了读写锁。由于代码过多我们只贴出来他的构造函数。

TreeBin(TreeNode<K,V> b) {
            super(TREEBIN, null, null, null);
            this.first = b;
            TreeNode<K,V> r = null;
            for (TreeNode<K,V> x = b, next; x != null; x = next) {
                next = (TreeNode<K,V>)x.next;
                x.left = x.right = null;
                if (r == null) {
                    x.parent = null;
                    x.red = false;
                    r = x;
                }
                else {
                    K k = x.key;
                    int h = x.hash;
                    Class<?> kc = null;
                    for (TreeNode<K,V> p = r;;) {
                        int dir, ph;
                        K pk = p.key;
                        if ((ph = p.hash) > h)
                            dir = -1;
                        else if (ph < h)
                            dir = 1;
                        else if ((kc == null &&
                                  (kc = comparableClassFor(k)) == null) ||
                                 (dir = compareComparables(kc, k, pk)) == 0)
                            dir = tieBreakOrder(k, pk);
                            TreeNode<K,V> xp = p;
                        if ((p = (dir <= 0) ? p.left : p.right) == null) {
                            x.parent = xp;
                            if (dir <= 0)
                                xp.left = x;
                            else
                                xp.right = x;
                            r = balanceInsertion(r, x);
                            break;
                        }
                    }
                }
            }
            this.root = r;
            assert checkInvariants(root);
        }

我们可以看到和HashMap一样熟悉的建立红黑树的操作。

ForwardingNode

在执行transfer操作期间,作为桶位的头结点。

三个原子操作

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }
  • tabAt():获得在i位置上的Node节点
  • casTabAt():利用CAS算法设置i位置上的Node节点。
  • setTabAt():设置节点位置的值。
    而casTabAt()与setTabAt()方法的区别:
    所以真正要进行有原子语义的写操作需要使用casTabAt()方法,setTabAt()是在锁定桶的状态下需要使用的方法,如此方法实现只是带保守性的一种写法而已。

讲解的主要方法

//无参构造
public ConcurrentHashMap(){}
//设定容量
public ConcurrentHashMap(int initialCapacity){}
//将map转化为CHM
public ConcurrentHashMap(Map<? extends K, ? extends V> m){}
//对table进行初始化
private final Node<K,V>[] initTable()
//增
public V put(K key, V value)
//在同一个节点的个数超过8个的时候,会调用treeifyBin方法来看看是扩容还是转化为一棵树
private final void treeifyBin(Node<K,V>[] tab, int index)
数组扩容的主要方法就是transfer方法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab)
//查
public V get(Object key)
//删除
public V remove(Object key)

另外在讲解主要方法时会顺便讲解其调用的方法。

初始化

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
        //table初始化的操作只能有一个线程执行,sizectl<0说明有另外一个线程正在初始化。那么本线程的初始化资格就让出去,不再执行后序的初始化操作。
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
                //CAS将cs改为-1,表示本线程正在初始化。
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);//sizeCtl更新为容量的0.75
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

对于ConcurrentHashMap来说,调用它的构造方法仅仅是设置了一些参数而已。而整个table的初始化是在向ConcurrentHashMap中插入元素的时候发生的。如调用put、computeIfAbsent、compute、merge等方法的时候,调用时机是检查table==null。
初始化方法主要应用了关键属性sizeCtl 如果这个值<0,表示其他线程正在进行初始化,就放弃这个操作。在这也可以看出ConcurrentHashMap的初始化只能由一个线程完成。如果获得了初始化权限,就用CAS方法将sizeCtl置为-1,防止其他线程进入。初始化数组后,将sizeCtl的值改为0.75*n。

构造函数

public ConcurrentHashMap() {
    }
public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }

有了HashMap基础前两个好理解,不过第二个设定initialCapacity时,在最后将sizeCtl设置为cap。第三个将sizeCtl设置为初始容量。不用担心sizeCtl会在后序中进行改变。我们来看一下他调用的putAll方法。

public void putAll(Map<? extends K, ? extends V> m) {
        tryPresize(m.size());
        for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
            putVal(e.getKey(), e.getValue(), false);
    }

其中tryPresize

private final void tryPresize(int size) {
		//由于有容量2的次幂限制,将容量设为大于等于size的2的次幂值。与HashMap一样该功能由tableSizeFor实现。
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
            //若table还未初始化,则进行初始化。
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                //此线程扩容期间,不允许其他线程进行扩容。
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
            //c没有达到扩容阈值或本身长度已经到达最大值,则不扩容,直接退出。
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            //初始化了,准备将值放到新的table中。
            else if (tab == table) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }

由于水平原因,笔者在这里遇到了个问题。
问题:31-39行代码永远不会执行。
解释:在6行对sc进行了赋值,进入while循环的都是sc>=0的情况。从while到执行sc<0之前的期间没有出现对sc重新赋值的情况,那么运行到sc<0的if判断时,一定为false。那么作者写这段代码的含义是什么?
下面讲解一下resizeStamp(n)函数,他执行之后,rs的二进制位表示中低16位为1,0-15位存储n二进制表示的最高非0位前0的个数值。不明白的看下面的博客讲解。
ConcurrentHashMap的源码分析-resizeStamp
结合上面的提示可以理解程序的执行流程,这里重点提一下40行的(rs << RESIZE_STAMP_SHIFT) + 2),CAS将SizeCtl的值变为(rs << RESIZE_STAMP_SHIFT) + 2),他的值一定为负数,因为rs的16位为1,在左移16位,那符号位为1,通过原子操作将SizeCtl变为相应的负数,并且创建一个两倍长度的新table,并赋值到nextTable上。下面我们只需要搞清楚一点如何将数据迁移到newtable中。
这是该类最难的transfer函数

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
    //若nextTab==null则建立newtable
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//扩张1倍的空间。
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            //老数据存在0-n-1,所以下一个索引为n
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
    //玄幻处理,i表示当前处理的桶下标。bound表示分得区间的下界。
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            //控制桶的序号
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //判断是否结束
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            //当前桶位空。插入一个fwd
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            //当前桶已经迁移
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            //迁移桶内数据
            else {
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        //处理节点是链表
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        //处理节点是红黑树
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

transfer可以拆分为这几个部分

  • 根据CPU数划分table的步长:

    • 若为单核CPU,步长为table.length
    • 若为多核CPU,步长为max((n >>> 3) / NPC, MIN_TRANSFER_STRIDE)
  • 若传入的nextTab为null,创建nextTable

    • 将创建的nextTab赋给全局变量nextTable
    • 将用于控制扩容区间的分配transferIndex初始化为table.length
  • 开始循环处理区间内各个桶的迁移,由advance控制程序。

    • while循环对区间[bound, i]进行分配或调整(–i)(27-43)
    • 开始下面四个分支
      • i越界:判断当前区间的迁移是否结束;(45-59)
      • 当前桶位null,插入一个fwd
      • 当前桶已经迁移,跳过
      • 其他:迁移桶内数据
        • 链表(72-101)
        • 红黑树(103-137)
  • 27-43:控制区间

while (advance) {
    int nextIndex, nextBound;
    // 选择下一个桶或处理结束信号
    if (--i >= bound || finishing)
        advance = false;
    // 没有新的区间可以分配
    else if ((nextIndex = transferIndex) <= 0) {
        i = -1;
        advance = false;
    }
    // 分配区间
    else if (U.compareAndSwapInt
             (this, TRANSFERINDEX, nextIndex,
              nextBound = (nextIndex > stride ?
                           nextIndex - stride : 0))) {
        bound = nextBound;
        i = nextIndex - 1;
        advance = false;
    }
}

while循环求得本次这一次多线程处理的区间范围。第一次循环1,2判断语句都不成立,进入第三个,生成第一个区间[bound,i]其中bound为nextBound,nextBound在cas中被改为nextIndex-stride,所以第一次区间为[nextIndex-stride, nextIndex-1],更新。下一次循环进入第一个判断语句,得到当前处理的i后,退出while,最后一次循环进入第二条判断语句,从后往前遍历到头了。

  • 45-59:结束条件
// i<0时,已经完成扩容,暂不知道(i>=n || i+n>=nextn)的含义
if (i < 0 || i >= n || i + n >= nextn) {
    int sc;
    // finishing为true则结束
    if (finishing) {
        nextTable = null; // 置空
        table = nextTab; // 替换
        sizeCtl = (n << 1) - (n >>> 1); // sizeCtl设为0.75倍的table.length
        return;
    }
    // finishing为false,但是该线程的任务已经完成,sizeCtl中的线程数减1
    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
        // 判断sizeCtl是否回到刚开始扩容的状态
        // 是则说明所有用于迁移的线程都结束工作,否则直接返回
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
            return;
        // finishing置为true
        finishing = advance = true;
        // i置为table.length,全部扫描一遍,确定所有桶都已经完成迁移
        i = n; // recheck before commit
    }
}

当i<0时,当前线程已经完成了table最后一个区间,对finishing进行判断:

  • finishing为true:

    • 全局变量nextTable为空
    • 替换table
    • 设置sizeCtl
  • finishing为false:

    • 原子操作对sizeCtl中线程数-1
    • 判断是否所有线程都结束工作,否则返回,是则继续
    • finishing为true
    • i为table.length,下一个循环开始全局检查。
  • 72-101:迁移链表

// 上锁,与putVal()同步,f为当前桶的头节点
synchronized (f) {
    if (tabAt(tab, i) == f) {
        Node<K,V> ln, hn;
        // hash>0为链表,树节点的hash=-2
        if (fh >= 0) {
            // 决定移动到低位桶还是高位桶
            int runBit = fh & n;
            Node<K,V> lastRun = f;
            // 找到最后将迁移到同一个桶的所有节点,
            // 这部分不需要创建新的节点,而是直接迁移
            for (Node<K,V> p = f.next; p != null; p = p.next) {
                int b = p.hash & n;
                if (b != runBit) {
                    runBit = b;
                    lastRun = p;
                }
            }
            if (runBit == 0) {
                ln = lastRun;
                hn = null;
            }
            else {
                hn = lastRun;
                ln = null;
            }
            // 根据每个节点的hash值迁移,由于节点中的key是不可变的,需要创建新的节点
            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                int ph = p.hash; K pk = p.key; V pv = p.val;
                if ((ph & n) == 0)
                    // 头插法,创建新节点时把自己作为next节点传入
                    // 最后链表的顺序将会颠倒(除了lastRun之后的)
                    ln = new Node<K,V>(ph, pk, pv, ln); 
                else
                    // 同上
                    hn = new Node<K,V>(ph, pk, pv, hn);
            }
            setTabAt(nextTab, i, ln); // 原子操作把低位桶置入新表
            setTabAt(nextTab, i + n, hn); // 原子操作把高位桶置入新表
            setTabAt(tab, i, fwd); // 原子操作把fwd置入旧表表示已经迁移
            advance = true;
        }
        else if (f instanceof TreeBin) {
            ...
        }
    }
}

与HashMap扩容一样,链表中要分为两支队伍,根据更高一位是0 or 1来分队伍。在这里用头插法,然后将两队挂到不同的对应索引下。第一个for循环用来找到并构建其中一个新链表的最后位置(因为第二个for中用到的是头插法构建新链表),也是后面for循环的终止条件。

  • 103-137:迁移红黑树
synchronized (f) {
    if (tabAt(tab, i) == f) {
        Node<K,V> ln, hn;
        if (fh >= 0) {
            ...
        }
        // 节点为红黑树
        else if (f instanceof TreeBin) {
            TreeBin<K,V> t = (TreeBin<K,V>)f;
            TreeNode<K,V> lo = null, loTail = null;
            TreeNode<K,V> hi = null, hiTail = null;
            int lc = 0, hc = 0;
            // 遍历
            for (Node<K,V> e = t.first; e != null; e = e.next) {
                int h = e.hash;
                TreeNode<K,V> p = new TreeNode<K,V>
                    (h, e.key, e.val, null, null);
                // 和链表相同的判断,与运算==0的放在低位
                if ((h & n) == 0) {
                    if ((p.prev = loTail) == null)
                        lo = p;
                    else
                        loTail.next = p;
                    loTail = p;
                    ++lc;
                } // 不是0的放在高位
                else {
                    if ((p.prev = hiTail) == null)
                        hi = p;
                    else
                        hiTail.next = p;
                    hiTail = p;
                    ++hc;
                }
            }
            // 如果树的节点数小于等于 6,那么转成链表,反之,创建一个新的树
            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
            (hc != 0) ? new TreeBin<K,V>(lo) : t;
            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
            (lc != 0) ? new TreeBin<K,V>(hi) : t;
            // 低位树
            setTabAt(nextTab, i, ln);
            // 高位数
            setTabAt(nextTab, i + n, hn);
            // 旧的设置成占位符
            setTabAt(tab, i, fwd);
            // 继续向后推进
            advance = true;
        }
    }
}

前面和HashMap中的split一致,逻辑也一致,只是使用了原子操作将两条链表或树挂到对应位置。

插入

主函数

public V put(K key, V value) {
        return putVal(key, value, false);
    }
final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

注意:hash值进行了更新操作为 hashcode ^ (hashcode >>> 16)) & HASH_BITS
函数流程:

  • table未被初始化,初始化table
  • table已经初始化,但是对应桶为空,新建Node加入,跳出
  • table已经初始化,且相应桶不为空,但桶正在迁移,当前线程帮助迁移。
  • table已经初始化,且相应桶不为空,且该桶未在迁移,将键值对加入桶中。
    • 桶中是链表,遍历这个链表:
      • 若存在:根据onlyIfAbsent修改value,跳出
      • 若不存在创建新的node挂到尾部,跳出
    • 桶中是红黑树: 通过TreeNode中putTreeValue写入。
      上面函数没有讲过的有treeifBin(),addCount()。二者都包含了扩容操作,但是扩容的结果是不同的。
      将treeifyBin中使用tryPresize进行的扩容为P扩容。
      将addCount进行的扩容称为C扩容。

treeifyBin

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY) // 扩容
            tryPresize(n << 1); 
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 转化红黑树
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

如果插入一个键值对后,链表长度大于等于 TREEIFY_THRESHOLD,就需要进行扩容或转化红黑树。treeifyBin() 是这一操作的入口,首先判断 table 容量是否小于 MIN_TREEIFY_CAPACITY,如果是则进行 P 扩容(由 tryPresize() 进行的,有别于下文中由 addCount() 进行的),否则进行红黑树转化。

addCount

private final void addCount(long x, int check) {
    // 计数部分,暂不考虑
    CounterCell[] as; long b, s;
    if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                //CAS失败进入fulladdCount(x,false),否则fullAddCount(x,true)
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
    // 判断是否C扩容
    if (check >= 0) { 
        Node<K,V>[] tab, nt; int n, sc;
        // 满足三个条件则C扩容:1.size达到sizeCtl;2.table非空;3.table长度非最大值
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            // 以下为C扩容,和tryPresize()中的一个分支完全一致
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

20行之前addCount通过CounterCell数组计算键值对个数,后面对check进行判断若其>=0,且

  1. 键值对数达到szieCtl
  2. table非空
  3. table长度并非最大值。
    进行C扩容。下面的结构与tryPresize中的一个分支完全一致。

计数

主要讲解的就是addCount前20行实现。
通过 size() 可以获得当前键值对的数量,它将 sumCount() 获得的 long 类型的值转化为 int 返回。
sumCount() 则计算 baseCount 字段与 counterCells 数组中所有非空元素的记录值的和。

// 未发生争用前都用它计数
private transient volatile long baseCount;
// 用于同步counterCells数组结构修改的乐观锁资源
private transient volatile int cellsBusy;
// 支持多个线程同时通过counterCells中的多个元素计数
private transient volatile CounterCell[] counterCells;
public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 : 
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : 
            (int)n); 
}
// 计算 baseCount 字段与所有 counterCells 数组的非空元素的和
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

CHM将每个桶中的键值对数量保存在baseCount和CounterCell 数组 counterCells 中。在什么地方对baseCount 和 counterCells进行赋值的呢?在addCount前20行。
addCount()中有两层if,第一层:

  • counterCells不为空:跳转到第二层if;
  • counterCells为空:CAS操作增加baseCount,成功结束if,失败进入二层if
    第二层if创建counterCells
  • counterCells==null || couterCells[线程hash]==null,调用fullAddCount(x,true)
  • couterCells[线程hash]!=null,CAS更改cellvalue值
    • 成功:结束
    • 失败:调用fullAddCount(x, false)【线程冲突】
private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
    	//线程Hash是否初始化
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            //初始化
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();
            //初始化后不会发生冲突
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            //counterCells是否可以用来计数,即判断他整体是否为空
            if ((as = counterCells) != null && (n = as.length) > 0) {
                //这个桶为空,初始化counterCells
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {            // Try to attach new Cell
                        //初始化,记录这个桶内的元素数量为要新建的元素个数。
                        CounterCell r = new CounterCell(x); // Optimistic create
                        //尝试拿锁
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try {               // Recheck under lock
                                //拿到锁了,将新建的counterCell元素放到指定位置。
                                CounterCell[] rs; int m, j;
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    //初始化创建完成
                                    created = true;
                                }
                            } finally {
                                //无论成功与否都释放锁。
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;//无需扩容
                }
                //存在竞争,重置wasUncontended后,下一句就是rehash,然后重试
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //上面获取锁失败,说明有其他线程得到了锁,那么尝试将值加到BaseCount上
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
                //counterCells是否扩容or其长度是否>=处理器数
                else if (counterCells != as || n >= NCPU)
                    collide = false;            // At max size or stale
                //没有扩容,数组长度<处理器数量,collide为true,计划给CounterCells数组扩容。
                else if (!collide)
                    collide = true;
                //尝试拿锁
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        //给counterCells扩容
                        if (counterCells == as) {// Expand table unless stale
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        //放锁
                        cellsBusy = 0;
                    }
                    //不需要扩容
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                //更改当前线程的hash值
                h = ThreadLocalRandom.advanceProbe(h);
            }
            //没有执行上面的if说明counterCells为null,初始化counterCells
            else if (cellsBusy == 0 && counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                boolean init = false;
                try {                           // Initialize table
                    if (counterCells == as) {
                        CounterCell[] rs = new CounterCell[2];//初始长度为2,最小的2次幂
                        rs[h & 1] = new CounterCell(x);
                        counterCells = rs;
                        init = true;//初始化成功
                    }
                } finally {
                    //放锁
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            //counterCells为空or长度为0,并且拿锁失败。
            //尝试将其加到basecount上
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }

如何理解流程呢?
他的主体就是for,循环中三个if

  1. if ((as = counterCells) != null && (n = as.length) > 0)counterCells非空情况
  2. else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1))counterCells为null
  3. else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))counterCells为空且获取锁失败
  4. h = ThreadLocalRandom.advanceProbe(h);重新生成hash
    三种情况,具体的每种情况的逻辑看具体的注释。

读取

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            // 首节点即匹配
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            // 该桶已经被迁移,则交由节点find()方法查找
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            // 搜索桶内所有节点
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

通过查看注释十分好理解,在首点匹配,桶迁移情况(调用find寻找)匹配失败后,搜索所有节点。

Node<K,V> find(int h, Object k) {
    // loop to avoid arbitrarily deep recursion on forwarding nodes
    outer: for (Node<K,V>[] tab = nextTable;;) {
        Node<K,V> e; int n;
        if (k == null || tab == null || (n = tab.length) == 0 ||
            (e = tabAt(tab, (n - 1) & h)) == null)
            return null;
        for (;;) {
            int eh; K ek;
            if ((eh = e.hash) == h &&
                ((ek = e.key) == k || (ek != null && k.equals(ek))))
                return e;
            if (eh < 0) {
                if (e instanceof ForwardingNode) {
                    tab = ((ForwardingNode<K,V>)e).nextTable;
                    continue outer;
                }
                else
                    return e.find(h, k);
            }
            if ((e = e.next) == null)
                return null;
        }
    }
}

参考

  1. ConcurrentHashMap源码分析(1.8)
  2. 源码级解读ConcurrentHashMap,妈妈再也不用担心我的学的学习
  3. ConcurrentHashMap底层原理剖析
  4. ConcurrentHashMap 1.8 源码分析
上一篇:JetBrains 中代码使用空格缩进,设置Tab键为4个空格


下一篇:Hive代码分析报告(十):语义分析⑤