ConcurrentHashMap使用与分析

使用对比

 HashMap非线程安全,在多线程并发的情况下add/get可能引入死循环,导致cpu利用率趋近于100%

解决方案有HashTable或者Collections.synchronizedMap(map)

这两个解决方案底层对读写方法进行加锁

此外还有一种结构:ConcurrentHashMap也是线程安全的

分别使用HashTable,Collection.synchornizedMap(map),ConcurrentHashMap这三个集合,循环100次创建50个线程往这三个集合中同时添加5000个元素,获取其中的元素, 分析使用不同集合put get的效率

package thread;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

class PutThread extends Thread{
    private Map<String, Integer> map;
    private CountDownLatch countDownLatch;
    private String key = this.getId()+"";

    public PutThread(Map<String, Integer> map, CountDownLatch countDownLatch){
        this.map = map;
        this.countDownLatch = countDownLatch;
    }

    public void run(){
        for(int i=0; i<5000; i++){
            map.put(key, i);
        }
        countDownLatch.countDown(); //-1
    }
}

class GetThread extends Thread{
    private Map<String, Integer> map;
    private CountDownLatch countDownLatch;
    private String key = this.getId()+"";

    public GetThread(Map<String, Integer> map, CountDownLatch countDownLatch){
        this.map = map;
        this.countDownLatch = countDownLatch;
    }

    public void run(){
        for(int i=0; i<5000; i++){
            map.get(key);
        }
        countDownLatch.countDown();
    }
}

class TestDemo14 {
    private static final int THREADNUM = 50;
    public static long put(Map<String, Integer> map){
        long start = System.currentTimeMillis();
        //起50个线程添加5000个元素
        CountDownLatch countDownLatch = new CountDownLatch(THREADNUM);
        for(int i=0; i < THREADNUM; i++){
            new PutThread(map, countDownLatch).start();
        }
        try {
            countDownLatch.await(); //计数器为0 打破阻塞
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return System.currentTimeMillis()-start;
    }

    public static long get(Map<String, Integer> map){
        //起50个线程获取5000个元素
        long start = System.currentTimeMillis();
        CountDownLatch countDownLatch = new CountDownLatch(THREADNUM);

        for(int i=0; i<THREADNUM; i++){
            new GetThread(map, countDownLatch).start();
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return System.currentTimeMillis()-start;
    }

    public static void main(String[] args) {
        Map<String, Integer> hashmapSync = Collections.synchronizedMap(new HashMap<String, Integer>());

        Map<String, Integer> hashtable = new Hashtable<>();

        Map<String, Integer> concurrentMap = new ConcurrentHashMap<>();

        long totalA = 0L; //Collections.synchronizedMap
        long totalB = 0L; //HashTable
        long totalC = 0L; //ConcurrentHashMap
        //计算put方法的总耗时
        for(int i = 0; i < 100; i++){
            totalA += put(hashmapSync);
            totalB += put(hashtable);
            totalC += put(concurrentMap);
        }

        System.out.println("put time Collections.synchronizedMap = " +totalA+".ms");
        System.out.println("put time Hashtable = " +totalB+".ms");
        System.out.println("put time ConcurrentHashMap = " +totalC+".ms");

        totalA = 0L; //Collections.synchronizedMap
        totalB = 0L; //HashTable
        totalC = 0L; //ConcurrentHashMap

        //计算get方法的总耗时
        for(int i=0; i<100; i++){
            totalA += get(hashmapSync);
            totalB += get(hashtable);
            totalC += get(concurrentMap);
        }

        System.out.println("get time Collections.synchronizedMap = " +totalA+".ms");
        System.out.println("get time Hashtable = " +totalB+".ms");
        System.out.println("get time ConcurrentHashMap = " +totalC+".ms");
    }
}

 

ConcurrentHashMap使用与分析

源码分析

1、类的继承关系

2、类的属性

sizeCtl: table的初始化和扩容需要用到的变量

-1 代表table正在初始化

N 代表N-1个线程在进行扩容操作

其他情况:

1)如果table未初始化,table表示初始化的大小

2)如果table初始化完成,表示table的容量,默认0.75*table.size

初始化操作在第一次put完成

concurrencyLevel在jdk1.8的意义改变,并不代表当前所允许的并发数,只是 用来sizeCtl大小,在jdk1.8的并发控制针对具体的桶而言,所以有多少个桶就有 多少个并发数

3) 构造函数 只是sizeCtl初始化,表示table初始化大小

  final V putVal(K key, V value, boolean onlyIfAbsent) {
 *         //ConcurrentHashMap中键和值不能为空
 *         if (key == null || value == null) throw new NullPointerException();
 *         int hash = spread(key.hashCode());
 *         int binCount = 0;
 *         for (Node<K,V>[] tab = table;;) {
 *         //无限循环 (多线程环境)
 *             Node<K,V> f; int n, i, fh;
 *             if (tab == null || (n = tab.length) == 0)
 *             //表为空或者表长度为0
 *                 //初始化表
 *                 tab = initTable();
 *             else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
 *                 //表不为空,该桶位置为空时
 *                 if (casTabAt(tab, i, null,
 *                              new Node<K,V>(hash, key, value, null)))
 *                     //CAS方式插入一个新的Node
 *                     break;                   // no lock when adding to empty bin
 *             }
 *             else if ((fh = f.hash) == MOVED)
 *                 //该节点的hash值为Moved,说明当前节点是ForwardingNode,意味着有其他线程
 *                 //在进行扩容,则一起进行扩容操作
 *                 tab = helpTransfer(tab, f);
 *             else {
 *                 V oldVal = null;
 *                 synchronized (f) {
 *                     //加锁同步,针对首个节点进行加锁操作
 *                     if (tabAt(tab, i) == f) {
 *                         //找到table表下标为i的节点
 *                         if (fh >= 0) {
 *                             //正常节点
 *                             binCount = 1;
 *                             for (Node<K,V> e = f;; ++binCount) {
 *                             //无线循环 相当于自旋
 *                                 K ek;
 *                                 if (e.hash == hash &&
 *                                     ((ek = e.key) == key ||
 *                                      (ek != null && key.equals(ek)))) {
 *                                     oldVal = e.val;
 *                                     if (!onlyIfAbsent)
 *                                         e.val = value;
 *                                     break;
 *                                 }
 *                                 Node<K,V> pred = e;
 *                                 if ((e = e.next) == null) {
 *                                     //遍历至最后一个节点
 *                                     pred.next = new Node<K,V>(hash, key,
 *                                                               value, null);
 *                                     //尾插法插入一个新节点
 *                                     break;
 *                                 }
 *                             }
 *                         }
 *                         else if (f instanceof TreeBin) {
 *                             //判断节点类型是否是红黑树类型
 *                             Node<K,V> p;
 *                             binCount = 2;
 *                             if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
 *                                                            value)) != null) {
 *                                 oldVal = p.val;
 *                                 if (!onlyIfAbsent)
 *                                     p.val = value;
 *                             }
 *                         }
 *                     }
 *                 }
 *                 if (binCount != 0) {
 *                     if (binCount >= TREEIFY_THRESHOLD)
 *                         treeifyBin(tab, i);
 *                     if (oldVal != null)
 *                         return oldVal;
 *                     break;
 *                 }
 *             }
 *         }
 *         //增加binCount容量,检查当前容量是否需要进行扩容
 *         addCount(1L, binCount);
 *         return null;
 *     }

 

 

 

 

 

 

 

 

 

 

上一篇:Java高并发编程基础三大利器之CountDownLatch


下一篇:CountDownLatch使用解说