1、HashMap底层数据结构
HashMap是基于哈希表的Map接口的非同步实现,常见的数据结构有堆栈、队列、数组、链表和红黑树
,Java中最基本的数据结构有两种,一种是数组
,一种是引用
。可以说其他所有的数据结构都可以从这两个最基本结构构造而来,当然HashMap也不例外。HashMap实际上是一个“链表散列”的数据结构,即数组和链表的结合体,所以说HashMap的底层其实就是一个数据结构,被称为哈希表结构
,数组中的每一项又是一个链表。当新建一个HashMap的时候,内部就会初始化一个数组。Entry就是数组中的一个元素
,每个Map.Entry其实就是一个key-value(键值对数据)
,每个节点包含当前元素的hash
、key
、value
和指向下一个元素的引用
,这就构成了链表。
1.1、JDK1.7和JDK1.8中HashMap的重大区别(很重要)
在此处着重强调,JDK1.7和JDK1.8是对HashMap的重大革新。
当面试官问到HashMap的结构和底层实现原理时,目前来说,HashMap是我们非常常用的数据结构,说是由数组和链表组成的数据结构,没人敢说错。但是在JDK1.8中引入一个新的数据结构,那就是红黑树,所以JDK1.8中HashMap是由数组+链表+红黑树
组成的数据结构,元素实例的名称也发生了改变,在JDK1.7中叫Entry,在JDK1.8中叫做Node(即节点),因为红黑树中存储数据全部是节点,实际上是Node<K,V> implements Map.Entry<K,V>
,再详细点说,JDK1.8中是数组和链表和红黑树共存的状态,数组定义的时候就是transient Node<K,V>[] table;
,所以链表和红黑树使用的也是Node节点了。
1.2、JDK1.8中HashMap涉及的数据结构
1、数组(位桶)
/**
* The table, initialized on first use, and resized as
* necessary. When allocated, length is always a power of two.
* (We also tolerate length zero in some operations to allow
* bootstrapping mechanics that are currently not needed.)
*/
transient Node<K,V>[] table;
2、数组节点元素Node<K,V>实现了Map.Entry接口
/**
* Basic hash bin node, used for most entries. (See below for
* TreeNode subclass, and in LinkedHashMap for its Entry subclass.)
*/
static class Node<K,V> implements Map.Entry<K,V> {
final int hash; //hash值
final K key; //键
V value; //值
Node<K,V> next; //指向下一个节点的引用
Node(int hash, K key, V value, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return value; }
public final String toString() { return key + "=" + value; }
public final int hashCode() {
return Objects.hashCode(key) ^ Objects.hashCode(value);
}
public final V setValue(V newValue) {
V oldValue = value;
value = newValue;
return oldValue;
}
//判断两个node是否相等,若key和value都相等,返回true。可以与自身比较为true
public final boolean equals(Object o) {
if (o == this)
return true;
if (o instanceof Map.Entry) {
Map.Entry<?,?> e = (Map.Entry<?,?>)o;
if (Objects.equals(key, e.getKey()) &&
Objects.equals(value, e.getValue()))
return true;
}
return false;
}
}
3、红黑树red-black Tree
static final class TreeNode<K,V> extends LinkedHashMap.Entry<K,V> {
TreeNode<K,V> parent; // 父节点
TreeNode<K,V> left; //左子树
TreeNode<K,V> right; //又子树
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red; //颜色属性
TreeNode(int hash, K key, V val, Node<K,V> next) {
super(hash, key, val, next);
}
//返回当前节点的根节点
final TreeNode<K,V> root() {
for (TreeNode<K,V> r = this, p;;) {
if ((p = r.parent) == null)
return r;
r = p;
}
}
}
1.3、JDK1.8为什么会引入红黑树?
在JDK1.7中HashMap采用的是数组(也称为位桶) + 链表
实现的,我们知道数组的长度是有限的,在有限的长度里我们将每一个元素(键值对)通过key进行哈希计算得到对应的hash值,但是hash本身就存在概率性,也就是说两个不同的key,例如“张三”和“三张”通过hash计算有一定的概率会得到相同的hash值
,这个时候就引发了一个问题hash碰撞(就是所谓的hash冲突)
后续讲解,而链表的引入就是为了解决hash冲突的。
也就是说同一hash值的Node都存储在一个链表中,在JDK1.7中使用头插法(在JDK1.8后使用的尾插法,后续再讲解),也就是说新来的值会占用原有值的位置,而原有值就顺推到了链表的下一个节点,因为写这段代码的作者认为后来插入的值被查找的可能性会更大些,可以提高查找的效率,所以使用了头插法。
那为什么引入红黑树呢?
当位于数组中同一个位置(即链表)中的元素较多时,也就是hash值相同的元素有很多时,全部加在同一个链表上,链表的长度过长,通过key值依次查找的效率会非常低,所以在JDK1.8中引入了红黑树,也就是当数组中某一位置的链表的长度大于等于阈值(8)
时,将链表转换为红黑树结构,这样就大大提高了查找的效率。红黑树和链表的结构图如下:
- 链表结构:这种情况下,需要遍历全部元素才行,时间复杂度为O(n);
- 红黑树结构:其访问性能近似折半查找,时间复杂度为O(logn)。
简单的说下,红黑树是一种近似平衡的二叉查找树,其主要的优点就是“平衡“,即左右子树高度几乎一致,以此来防止树退化为链表,通过这种方式来保障查找的时间复杂度为 O(logn)。
关于红黑树的内容,有太多太多了,主要有一下几个特性:
- 每个节点要么是红色,要么是黑色,但根节点永远是黑色;
- 每个红色节点的两个子节点一定是黑色;
- 红色几点不能连续(也即是,红色节点的子节点和父节点都不能是红色的);
- 从任一节点到其子树中的每个几点的路径都包含相同数量的黑色节点;
- 所有的叶子节点都是黑色的(注意:这里说的叶子节点其实就是上图中的“NIL”节点)。
JDK1.8中HashMap中链表与红黑树共存的状态:
2、HashMap的主要参数都有哪些?
-
table:也称为位桶,就是hashmap中存储数据的数组;
/** * The table, initialized on first use, and resized as * necessary. When allocated, length is always a power of two. * (We also tolerate length zero in some operations to allow * bootstrapping mechanics that are currently not needed.) */ transient Node<K,V>[] table;
- bin:就是挂在数组上的链表;
- binCount:表示发生hash冲突的链表上节点的个数,超过阈值8就有转化为红黑树的条件(前提是数组长度要>=64);
-
TREEIFY_THRESHOLD :8,链表转化为红黑树的阈值(前提条件是数组容量大于64)
/** * The bin count threshold for using a tree rather than list for a * bin. Bins are converted to trees when adding an element to a * bin with at least this many nodes. The value must be greater * than 2 and should be at least 8 to mesh with assumptions in * tree removal about conversion back to plain bins upon * shrinkage. */ static final int TREEIFY_THRESHOLD = 8;
-
TreeNode:红黑树;
static final class TreeNode<K,V> extends LinkedHashMap.Entry<K,V> { TreeNode<K,V> parent; //父节点 TreeNode<K,V> left; //左子树 TreeNode<K,V> right; //右子树 TreeNode<K,V> prev; // needed to unlink next upon deletion boolean red; //颜色属性(红与黑) TreeNode(int hash, K key, V val, Node<K,V> next) { super(hash, key, val, next); } }
-
initialCapacity: table数组的默认初始容量为16;
1<<4 位运算 等于16
,因为位运算比乘法运算效率高的多;/** * The default initial capacity - MUST be a power of two. */ static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16
-
UNTREEIFY_THRESHOLD : 6,链表长度小于等于6,红黑树就有转化为链表的机会(反树化);
/** * The bin count threshold for untreeifying a (split) bin during a * resize operation. Should be less than TREEIFY_THRESHOLD, and at * most 6 to mesh with shrinkage detection under removal. */ static final int UNTREEIFY_THRESHOLD = 6;
-
MIN_TREEIFY_CAPACITY :64 ,链表转化为红黑树时的前提条件,table数组的最小长度阈值(或者说即使数组中某个链表的长度达到了阈值8,也不会立即转换结构为红黑树,而是先判断数组长度是否大于64,如果不足64,首先是resize扩容数组大小(扩容2倍);如果大于64,那些长度大于阈值8的链表结构才会转化为红黑树)
/** * The smallest table capacity for which bins may be treeified. * (Otherwise the table is resized if too many nodes in a bin.) * Should be at least 4 * TREEIFY_THRESHOLD to avoid conflicts * between resizing and treeification thresholds. */ static final int MIN_TREEIFY_CAPACITY = 64;
-
MAXIMUM_CAPACITY:数组的最大容量,
1<<30 左移30位
/** * The maximum capacity, used if a higher value is implicitly specified * by either of the constructors with arguments. * MUST be a power of two <= 1<<30. */ static final int MAXIMUM_CAPACITY = 1 << 30;
-
loadFactory:负载因子,默认初始值为0.75f(也就是数组容量的占比为75%),当数组的容量达到
capacity * loadfactory
的容量时,会自动resize扩容;/** * The load factor used when none specified in constructor. */ static final float DEFAULT_LOAD_FACTOR = 0.75f;
3、HashMap的存取原理(put和get方法)
3.1、map.put()方法实现原理
当我们往HashMap中put元素的时候,先根据key的hashCode重新计算hash值,根据hash值得到这个元素在数组中的位置(即下标索引),如果数组在该位置上已经存放有其他元素了,那么在这个位置上的元素将以链表的形式存放,新加入的元素放在链表的头部,先前加入的元素都向后顺延一个位置。如果数组在该位置上没有元素,就直接将该元素放到此数组的该位置上。当系统决定存储HashMap中的key-value(键值对)时,完全没有考虑Entry中value值,仅仅只是根据key来计算并决定每个Entry对象的存储位置。我们完全可以把Map集合中的value当成key的附属,当系统决定了key的存储位置之后,value随之保存在那里即可。
hash(int h)方法根据key的hashCode值重新计算一次散列。此算法加入了高位计算,防止低位不变,高位变化时,造成的hash冲突。对于任意给定的对象,只要它的hashCode()返回值相同,那么程序调用hash(int h)方法所计算得到的hash值总是相同的。我们首先想到的就是把hash值与数组长度取模运算,这样一来,元素的分布相对来说是比较均匀的。但是“模”运算的消耗还是比较大的,在HashMap中是这样做的:调用indexFor(int h,int length) 方法来计算该对象应该保存在table数组的哪个索引处。
- 首先将(k、v)封装到Node节点对象当中;
- 然后底层会调用k的hashCode()方法得出hash值;
- 通过哈希表函数/哈希算法,将hash值转换成数组的下标(用hash值与数组的长度进行取模运算或者位运算),就把Node添加到这个位置上,如果说下标对应的位置上有链表。此时就会拿着k和链表上每个Node节点的k进行equals对比,如果所有的equals方法返回的都是false,那么这个新的节点将被添加到链表的头部(JDK1.7)或末尾(JDK1.8),如果其中有一个equals方法返回的是true,那么这个节点的value值将会被覆盖。
看put方法源码:
public V put(K key, V value) {
return putVal(hash(key), key, value, false, true);
}
/**
* Implements Map.put and related methods
*
* @param hash hash for key
* @param key the key
* @param value the value to put
* @param onlyIfAbsent if true, don't change existing value
* @param evict if false, the table is in creation mode.
* @return previous value, or null if none
*/
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
//如果在数组table[(n-1) & hash]处的值为空,则新建一个Node节点,插入在该位置
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
//此处表示有hash冲突,开始处理冲突(说明此位置有单向链表存在)
Node<K,V> e; K k;
//检查第一个Node,p是不是要找的hash值,并逐步对每个节点上的key进行equals
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
//此处判断是否为树结构,如果是直接将key-value插入树节点中
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {
//binCount是记录当前链表发生冲突的节点数
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
//如果发生冲突的节点数大于等于阈值8,看是否需要改变冲突节点的存储结构;
//treeifyBin判断当前HashMap数组的长度,如果不足64,
//只对数组table进行resize扩容,暂且不对冲突节点达到8的链表进行结构转换
//当扩容后数组table的长度达到了64后,再将冲突节点数达到8的链表的存储结构转化为红黑树
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
//如果有相同的key值就结束遍历,直接将该key的value值覆盖掉
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
//返回存在的Value值
return oldValue;
}
}
++modCount;
//如果当前大小大于门限,门限=table初始容量*0.75
if (++size > threshold)
resize();//扩容2倍(新容量 = 旧容量*2)
afterNodeInsertion(evict);
return null;
}
HashMap的treeifyBin()方法源码解析:
final void treeifyBin(Node<K,V>[] tab, int hash) {
//定义几个变量,n是数组长度,index是索引
int n, index; Node<K,V> e;
//这里的tab指的是本HashMap中的数组,n为数字长度,如果数组为null或者数组长度小于64
if (tab == null || (n = tab.length) < MIN_TREEIFY_CAPACITY)
//则调用resize()方法直接扩容,不转红黑树
resize();
//否则说明满足转红黑树条件,通过按位与运算取得索引index,并将该索引对应的node节点赋值给e,e不为null时
else if ((e = tab[index = (n - 1) & hash]) != null) {
//定义几个变量,hd代表头节点,tl代表尾节点
TreeNode<K,V> hd = null, tl = null;
do {
//先把e节点转成TreeNode类型,并赋值给p
TreeNode<K,V> p = replacementTreeNode(e, null);
//如果尾节点tl为空,则说明还没有根节点,试想下,这时元素数目都超过8个了,还能没有尾节点么,所以没有尾节点只能说明还没设置根节点
if (tl == null)
//设置根节点,把p赋值给根节点hd
hd = p;
else {
//把tl设置为p的前驱节点
p.prev = tl;
//把p设置为tl的后继节点,这两步其实就是我指向你,你指向我的关系,为了形成双向链表
tl.next = p;
}
//把首节点设置成p后,把p赋值给尾节点tl,然后会再取链表的下一个节点,转成TreeNode类型后再赋值给p,如此循环
tl = p;
//取下一个节点,直到下一个节点为空,也就代表这链表遍历好了
} while ((e = e.next) != null);
//用新生成的双向链表替代旧的单向链表,其实就是把这个数组对应的位置重新赋值成新双向链表的首节点
if ((tab[index] = hd) != null)
//这个方法里就开始做各种比较,左旋右旋,然后把双向链表搞成一个红黑树
hd.treeify(tab);
}
}
3.2、map.get()方法实现原理
- 先调用k的hashCode方法得出哈希值,并通过hash算法计算出当前k在数组中的下标;
- 通过上一步哈希算法计算出数组的下标后,再通过数组下标快速定位到某个位置上,如果这个位置上什么也没有,则返回null;如果这个位置上有单向链表,那么就拿着k与该链表上每一个节点的k进行equals,如果所有的equals方法返回的都是false,则get方法返回null值;如果其中一个equals方法返回的是true,那么此时该节点的value就是我们要找的对应value值了,get方法最终会返回这个value。
看get方法源码:
public V get(Object key) {
Node<K,V> e;
return (e = getNode(hash(key), key)) == null ? null : e.value;
}
/**
* Implements Map.get and related methods
*
* @param hash hash for key
* @param key the key
* @return the node, or null if none
*/
final Node<K,V> getNode(int hash, Object key) {
//tab是Entry对象数组,first是tab数组中经过散列的第一个位置
Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
//通过hash值与(n-1)进行位运算得到下标索引,tab[(n-1) & hash]
//一条链上的hash值是相同的,所以默认得到的是头节点,first = tab[(n-1) & hash]
if ((tab = table) != null && (n = tab.length) > 0 &&
(first = tab[(n - 1) & hash]) != null) {
//判断第一个Node节点是不是要找的Node
//判断条件,hash值要相同,key值要相同
if (first.hash == hash && // always check first node
((k = first.key) == key || (key != null && key.equals(k))))
return first;
//如果头结点不是要找的Node节点,就判断头结点指向的下一个Node节点
if ((e = first.next) != null) {
if (first instanceof TreeNode)
return ((TreeNode<K,V>)first).getTreeNode(hash, key);
do {
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
//此处做do-while循环判断该链表上所有节点,
//直到找到key值和hash值都相同的节点,然后直接返回
} while ((e = e.next) != null);
}
}
return null;
}
3.3、HashMap的存取归纳总结
HashMap在底层将key-value当成一个整体进行处理,这个整体就是一个Entry对象,HashMap底层采用的是一个Entry[]数组来保存所有的key-value(键值对)的。
- 当需要存储一个Entry对象时,会根据hash算法来决定其在数组中的存储位置,如果位置上有链表,再根据equals方法决定其在数组该位置上的链表中的存储位置;
- 当需要取出一个Entry对象时,也会根据hash算法找到其在数组中对应的位置,果位置上有链表,再根据equals方法从该位置上的链表中取出该Entry对象的value值。
4、HashMap默认初始化大小是多少?为什么?为啥都是2的整数次幂?
4.1、HashMap初始值大小
-
initialCapacity: table数组的默认初始容量为16;
1<<4 位运算 等于16
/** * The default initial capacity - MUST be a power of two. */ static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16
我们在创建HashMap的时候,阿⾥巴巴规范插件会提醒我们最好赋初值,⽽且最好是2的
幂。
4.2、为什么选择初始值为16呢?
因为选择16是为了位运算更加方便,位运算比乘法运算效率高的多,而且选择16是为了服务于将key映射到index的算法。
index索引的计算公式:
//& 按位与运算
int index = key.hashCode() & (length - 1)
例如:
十进制: 201314
二进制: 11 0001 0010 0110 0010
选择数组初始值length为16,那么(length-1)
就等于15
十进制:15
二进制:1111
此时按位与得出下标索引index值
index = 11 0001 0010 0110 0010 & 1111
index = 0010 = 2
从上述按位与可以看出,选择长度为16,length-1为15,二进制位全为1,这种情况下,所有index的结果就等同于hashCode()计算出的hash值的最后四位,只要输入的key本身均匀分布,hashCode()计算得出的hash值就是均匀分布的,而hash算法的最终结果index就是均匀分布的,这就是为了实现均匀分布
。
实现均匀分布,就是为了减少hash碰撞(冲突)的概率,并且提高了hashMap的查询速度。
4.3、为啥都是2的整数次幂?
那么既然16可以,是不是只要是2的整数次幂就可以呢?没毛病。
那为什么不选择8、32呢? 因为是8的话,容量太小很容易触发map扩容机制,影响性能,如果是32的话稍微太大又会浪费资源,所以就使用16作为初始大小最为合适。
总结:
- 为了减少hash碰撞;
- 提高map查询效率;
- 避免分配过小容易频繁扩容(影响性能);
- 避免分配过大浪费资源。
5、HashMap的扩容机制?负载因子?为什么是这么多?
5.1、hashMap的扩容机制resize方法
我们知道数组容量是有限的,当插入的数据到达一定的数量后,数组就会自动扩容,这个时候就会调用resize()
方法进行扩容。
说道resize()方法,有两个重要的相关因素:
- Capacity:HashMap当前⻓度(初始赋值Capacity即为所赋的值,没有赋值默认就为16)。
-
LoadFactory:负载因子,默认初始值为0.75f(也就是数组容量的占比为75%);
/** * The load factor used when none specified in constructor. */ static final float DEFAULT_LOAD_FACTOR = 0.75f;
举个栗子:就⽐如当前数组的容量⼤⼩为100,当你在存进去第76个数据的时候,内部机制判断发现需要进⾏resize扩容了,但是HashMap的扩容也不是简单的扩⼤点容量这么简单的。
看下resize()方法的源码:
/**
* Initializes or doubles table size. If null, allocates in
* accord with initial capacity target held in field threshold.
* Otherwise, because we are using power-of-two expansion, the
* elements from each bin must either stay at same index, or move
* with a power of two offset in the new table.
*
* @return the table
*/
final Node<K,V>[] resize() {
//未扩容前的数组
Node<K,V>[] oldTab = table;
//当前数组长度
int oldCap = (oldTab == null) ? 0 : oldTab.length;
//当前数组的阈值
int oldThr = threshold;
int newCap, newThr = 0;
if (oldCap > 0) {
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
newThr = oldThr << 1; // double threshold
}
else if (oldThr > 0) // initial capacity was placed in threshold
newCap = oldThr;
else { // zero initial threshold signifies using defaults
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
if (newThr == 0) {
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
//此处通过扩大二倍后的newCap * loadFactor 计算得出新的阈值
threshold = newThr;
@SuppressWarnings({"rawtypes","unchecked"})
//新建一个数组,容量为newCap,原数组容量的二倍
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab;
if (oldTab != null) {
for (int j = 0; j < oldCap; ++j) {
//新建Node节点
Node<K,V> e;
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
if (e.next == null)
//将就数组中的key-value(键值对),重新通过hash计算,分配到新数组中
newTab[e.hash & (newCap - 1)] = e;
else if (e instanceof TreeNode)
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else { // preserve order
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}
由上述代码可以看出,扩容大致分为两步:
- 扩容:创建一个新的Entry空数组,长度是原数组的2倍;
- ReHash:遍历原数组中的所有数据,通过hash算法,重新计算出所有数据的下标索引,进行分配存储位置。
这个时候,有人会问为什么要重新Hash计算下标呢,直接复制过去不香么?
这个时候要稳住,莫慌,其实原理很简单!
是因为数组⻓度扩⼤以后,Hash的规则也随之改变。
//数据下标索引计算公式
index = key.HashCode() & (Length - 1)
你看哈,原数组长度(Length)是16你,十进制数201314
通过位运算得出的index索引值是2,那新数组的⻓度是32了,通过位运算出来的值能一样吗?
5.2、负载因子LoadFactory为什么是0.75,而不是0.8,0.6呢?
-
LoadFactory:负载因子,默认初始值为0.75f(也就是数组容量的占比为75%);
/** * The load factor used when none specified in constructor. */ static final float DEFAULT_LOAD_FACTOR = 0.75f;
负载因子是用来表示 HashMap 中数据的填满程度:
//Capacity是数组总长度
loadFactory = (哈希表中的数据量) / Capacity
哈希表这样的数据结构容易产生两个问题:
1、数组的容量过小,经过哈希计算后的下标,容易出现冲突;
2、数组的容量过大,导致空间利用率不高。
这就意味着两个问题:
1、加载因子越小,填满的数据就越少,哈希冲突的几率就减少了,但浪费了空间,而且还会提高扩容的触发几率;
2、加载因子越大,填满的数据就越多,空间利用率就高,但哈希冲突的几率就变大了。
综上所述,就是必须在==哈希冲突==和==空间利用率==两者之间做到有所取舍,其实是尽量保持平衡,谁也不碍着谁。
我们知道,HashMap 是通过拉链法来解决哈希冲突的。
为了减少哈希冲突发生的概率,当HashMap的数组长度达到一个==临界值==的时候,就会触发resize扩容机制,扩容过程中会将原数组中的数据,通过ReHash重新计算索引值,拷贝到新数组中,这是一个相当耗时的操作。
//initialCapacity :数组初始容量
//LoadFactory:负载因子
临界值 = initialCapacity * LoadFactory
也就是说,当 16*0.75=12 时,会触发扩容机制。为什么加载因子会选择 0.75 呢?而不是0.8、0.6呢?
其实这跟统计学里的一个很重要的原理——泊松分布有关,二项式说实话真不会了,本来就没学懂,想了解更深的伙子可以自行去看看了。
具体是用这个公式表示的:
在HashMap的源码文档中,大概175行处,有这样一段英文描述:
* Because TreeNodes are about twice the size of regular nodes, we
* use them only when bins contain enough nodes to warrant use
* (see TREEIFY_THRESHOLD). And when they become too small (due to
* removal or resizing) they are converted back to plain bins. In
* usages with well-distributed user hashCodes, tree bins are
* rarely used. Ideally, under random hashCodes, the frequency of
* nodes in bins follows a Poisson distribution
* (http://en.wikipedia.org/wiki/Poisson_distribution) with a
* parameter of about 0.5 on average for the default resizing
* threshold of 0.75, although with a large variance because of
* resizing granularity. Ignoring variance, the expected
* occurrences of list size k are (exp(-0.5) * pow(0.5, k) /
* factorial(k)). The first values are:
*
* 0: 0.60653066
* 1: 0.30326533
* 2: 0.07581633
* 3: 0.01263606
* 4: 0.00157952
* 5: 0.00015795
* 6: 0.00001316
* 7: 0.00000094
* 8: 0.00000006
* more: less than 1 in ten million
大致意思如下:
因为 TreeNode(红黑树)的大小约为链表节点的两倍,所以我们只有在一个拉链已经拉了足够节点的时候才会转为tree(参考TREEIFY_THRESHOLD)。并且,当这个hash桶的节点因为移除或者扩容后resize数量变小的时候,我们会将树再转为拉链。如果一个用户的数据的hashcode值分布得很均匀的话,就会很少使用到红黑树。理想情况下,我们使用随机的hashCode值,加载因子为0.75的情况,尽管由于粒度调整会产生较大的方差,节点的分布频率仍然会服从参数为0.5的柏松分布。链表的长度为8发生的概率仅有
千万分之一
。
虽然这段话的本意更多的是表示JDK1.8中为什么拉链长度超过8的时候进行了链表转化红黑树,但是提到了0.75这个负载因子,但这并不是loadFactory等于0.75的真正答案。
本段参考二哥文章:https://itwanger.gitee.io/tobebetterjavaer/#/docs/collection/hashmap-loadfactor
和另一位哥的文章:https://segmentfault.com/a/1190000023308658
这个负载因子是从JDK1.8引入的,而且提到了0.75,看了这段话的意思,如果不好好理解,很容易就认为这段话的意思就是阐述0.75是怎么来的了,然后就简答的把泊松分布与0.75的由来连接到0.75上去了。然而,==这段话的本意其实更多的是表示JDK1.8中为什么拉链的长度超过8的时候会进行红黑树的转换。这个柏松分布的模型其实就是基于当负载因子为0.75时的模型去进行模拟演算的。==
考虑到HashMap的容量是有一个要求的:它必须是2的次幂。当加载因子选择了0.75就可以保证它与容量的乘积为整数。如下:
16 * 0.75 = 12
32 * 0.75 = 24
除了0.75,0.5~1之间还有0.625(5/8)、0.875(7/8)可选,从中位数的角度,挑0.75是最为合适的。另外,*上说,拉链法(解决哈希冲突的一种方法)的加载因子最好限制在0.7-0.8以下,超过0.8,查表的时候CPU缓存不命中(cache missing)会按照指数前上升。
综上所述,0.75 是个比较完美的选择。
6、解决hash冲突的办法有哪些?HashMap用的哪种?
解决Hash冲突的方法如下:
- 开放定址法:也称为再散列法,基本思想就是,如果p=hash(key)出现冲突时,则以p为基础,再次hash,p1=hash(p),如果p1再次出现冲突,则以p1位基础,再次hash,以此类推,直到找到一个不冲突的哈希地址pi。因此开放定址法所需要的hash表的长度要大于所需要存放的元素,而且因为存在再次hash,所以只能在删除的节点上做标记,而不能真正删除结点。
- 再哈希法:双重散列、多重散列,提供多种不同的hash函数,当r1=hash1(key1)发生冲突时,在计算第二种算法r2=hash2(key1),直到没有冲突为止。这样虽然不易产生堆集,但增加了计算的时间。
- 链地址发:也称拉链法,将哈希值相同的元素构成一个同义词的单链表并将哈希表的头指针存放在哈希表的第i个单元中,查找、插入和删除主要在同义词链表中进行。链表法适用于经常进行插入和删除的情况。
- 建立公共溢出区:将哈希表分为公共表和溢出表,当溢出发生时,将所有溢出数据统一放到溢出区。
==HashMap采用的是链地址法(即拉链法)。==
7、承接上文,为啥JDK1.8之前用头插法,JDK1.8之后改成了尾插法?
7.1、两种插法
头插法:就是每次在一个链表中put新值时,新的键值对会占用头指针指向的旧数据的位置,也就是说新来的值会占用原有值的位置,而原有值就顺推到了链表的下一个节点,因为写这段代码的作者认为后来插入的值被查找的可能性会更大些,可以提高查找的效率,所以使用了头插法。
尾插法:从字面理解就是新插入的值会依次插入链表的下一个位置,不会改变原来所有数据的位置。
那为啥JDK1.8之前用头插法,JDK1.8之后改成了尾插法?
我们先举个栗子,我们现在往一个容量大小为2的数组中put两个值,负载因子默认为0.75,而2*0.75=1.5,取整为1,那我们往里面put俩值,很显然当我们put第二个值的时候,数组就要触发resize()扩容机制了。
The next size value at which to resize (capacity * load factor).
现在我们要在容量为2的容器里面用不同线程
插入A(k1,v1),B(k2,v2),C(k3,v3),假如我们在热size之前打个断点,那意味着数据都插入了,但是还没扩容,那么扩容前可能会出现如下情况:
我们可以看到链表的指向为A->B->C:
因为resize()方法的扩容方式,在JDK1.8前使用的是单链表的头插法,同一位置上的新元素总是会被放在链表的头部位置,如果发生扩容,在旧数组中同一个Entry单链表上的元素,通过Rehash重新计算每个Entry的索引位置后,有可能被放到新数组中的不同位置上,如下所示:
由上述图可知,不光是元素的位置可能发生改变,同一个链表中的引用指向顺序也可能发生改变,原本是A指向B,而现在是B指向了A,但是在Aput进旧数组中的时候,A元素会存储当前键值对数据中的hash
、key
、value
和指向B元素的引用
,而现在重新计算后,B指向了A,说明B中也有了指向A元素的引用,所以⼀旦⼏个线程都调整完成,就可能出现环形链表
的情况:
如果这个时候无论哪一个线程去取值,悲剧就出现了——Infinite Loop(无限循环)
。
那为啥子JDK1.8后使用了尾插法?
上述我们说了,使用头插法会改变链表上元素的顺序,但是如果使用尾插法,在扩容后会保持链表元素原本的顺序,就不会出现环形链表的问题了。
就是说原本是A指向B,在扩容后那个链表还是A指向B,顺序和指向引用都不会改变。
7.2、总结
- JDK1.8之前在多线程操作HashMap的时候可能会引起死循环,原因是扩容转移前后,链表中元素的顺序混乱倒置,在转移过程中修改了原来链表中节点的引用关系。
- JDK1.8之后在同样的多线程操作HashMap时,不会引起死循环,原因是在扩容转移过程中链表中节点指向顺序不会改变,仍然保持之前节点的引用关系。
8、HashMap为啥是非线程安全?
文章来自二哥:https://itwanger.gitee.io/tobebetterjavaer/#/docs/collection/hashmap-thread-nosafe
三个原因:
- 多线程下扩容链表会发生死循环(特指JDK1.8之前,1.8之后是尾插法,不会发生死循环);
- 多线程下进行put会导致元素丢失;
- put和get高并发时会导致get到null值。
咱们来一一分析:
8.1、多线程下扩容链表会发生死循环
众所周知,HashMap是通过拉链法来解决哈希冲突的,也就是当哈希冲突时,会将相同的哈希值的键值对数据通过链表的形式保存起来。
JDK1.7时,采用的是头插法的方式来存放链表的,也就是下一个冲突的键值对会放在上一个键值对的前面(同一位置上的新元素被放在链表的头部),扩容的时候就有可能会出现唤醒链表,造成死循环。
resize方法的源码(JDK1.7的):
// newCapacity为新的数组容量
void resize(int newCapacity) {
// 小数组,临时过度下
Entry[] oldTable = table;
// 扩容前的容量
int oldCapacity = oldTable.length;
// MAXIMUM_CAPACITY 为最大容量,2 的 30 次方 = 1<<30
if (oldCapacity == MAXIMUM_CAPACITY) {
// 容量调整为 Integer 的最大值 0x7fffffff(十六进制)=2 的 31 次方减1
threshold = Integer.MAX_VALUE;
return;
}
// 初始化一个新的数组(大容量)
Entry[] newTable = new Entry[newCapacity];
// 把小数组的元素转移到大数组中
transfer(newTable, initHashSeedAsNeeded(newCapacity));
// 引用新的大数组
table = newTable;
// 重新计算阈值
threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}
transfer 方法用来转移,将小数组的元素拷贝到新的数组中。
void transfer(Entry[] newTable, boolean rehash) {
// 新的容量
int newCapacity = newTable.length;
// 遍历旧数组,table是旧数组
for (Entry<K,V> e : table) {
while(null != e) {
// 拉链法,相同 key 上的不同值
Entry<K,V> next = e.next;
// 是否需要重新计算 hash
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
// 根据新数组的容量,和键的 hashCode值, hash计算元素在数组中的下标
int i = indexFor(e.hash, newCapacity);
// 同一位置上的新元素被放在链表的头部
e.next = newTable[i];
// 放在新的数组上
newTable[i] = e;
// 链表上的下一个元素
e = next;
}
}
}
注意 e.next = newTable[i]
和 newTable[i] = e
这两行代码,就会将同一位置上的新元素放在链表的头部。
//hash键的hash值,capacity数组长度,两者取模,得出下标索引
index = hash % capacity
扩容前的样子加入如下图所示:
那么正常扩容后,如下图所示:
假设现在有两个线程同时进行扩容,线程 A 在执行到 newTable[i] = e;
被挂起,此时线程 A 中:e=3、next=7、e.next=null
:
线程 B 开始执行,并且完成了数据转移。
随后线程A获得CPU时间片继续执行 newTable[i] = e,将3放入新数组对应的位置,执行完此轮循环后线程A的情况如下:
执行下一轮循环,此时 e=7,原本线程 A 中 7 的 next 为 5,但由于 table 是线程 A 和线程 B 共享的,而线程 B 顺利执行完后,7 的 next 变成了 3,那么此时线程 A 中,7 的 next 也为 3了。
采用头部插入的方式,变成了下面这样子:
看上去好像没什么问题,此时next = 3,e = 3
。
进行下一轮循环,但此时,由于线程B将3的next变为了null,所以此轮循环应该是最后一轮了。
接下来当执行完e.next = newTable[i]
即3.next = 7,3和7之间就相互连接指向了,执行完newTable[i] = e
后,3被头插法重新插入到链表中,执行结果如下图所示:
套娃开始,元素 5 也就成了弃婴,惨惨惨~~~
不过,JDK1.8 时已经修复了这个问题,链表中使用尾插法,扩容时会保持链表原来的顺序。
8.2、多线程下 put 会导致元素丢失
正常情况下,当发生哈希冲突时,HashMap 是这样的:
但多线程同时执行 put 操作时,如果计算出来的索引位置是相同的,那会造成前一个 key 被后一个 key 覆盖,从而导致元素的丢失。
put方法源码:
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
// 步骤①:tab为空则创建
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
// 步骤②:计算index,并对null做处理
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
// 步骤③:节点key存在,直接覆盖value
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
// 步骤④:判断该链为红黑树
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
// 步骤⑤:该链为链表
else {
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
//链表长度大于8转换为红黑树进行处理
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
// key已经存在直接覆盖value
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
// 步骤⑥、直接覆盖
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
// 步骤⑦:超过最大容量 就扩容
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}
问题发生在步骤 ② 这里:
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
加入两个线程都执行了 if 语句,假设线程 A 先执行了 tab[i] = newNode(hash, key, value, null)
,那 table 是这样的:
接着,线程 B 执行了 tab[i] = newNode(hash, key, value, null)
,那 table 是这样的:
3被干掉了。
8.3、put 和 get 并发时会导致 get 到 null
线程 A 执行put时,因为元素个数超出阈值而出现扩容,线程B 此时执行get,有可能导致这个问题。
注意来看 resize 源码:
final Node<K,V>[] resize() {
Node<K,V>[] oldTab = table;
int oldCap = (oldTab == null) ? 0 : oldTab.length;
int oldThr = threshold;
int newCap, newThr = 0;
if (oldCap > 0) {
// 超过最大值就不再扩充了,就只好随你碰撞去吧
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
// 没超过最大值,就扩充为原来的2倍
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
newThr = oldThr << 1; // double threshold
}
else if (oldThr > 0) // initial capacity was placed in threshold
newCap = oldThr;
else { // zero initial threshold signifies using defaults
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
// 计算新的resize上限
if (newThr == 0) {
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
threshold = newThr;
@SuppressWarnings({"rawtypes","unchecked"})
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab;
}
线程 A 执行完 table = newTab 之后,线程 B 中的 table 此时也发生了变化,此时去 get 的时候当然会 get 到 null 了,因为元素还没有转移。
9、如何解决HashMap在多线程环境下存在线程安全问题?
三种解决方法:
- Collections.synchronizedMap(Map)创建线程安全的Map集合
- HashTable
- ConcurrentHashMap
不过出于线程并发度的原因,我都会舍弃前两者使⽤最后的ConcurrentHashMap,他的性能和效率明显⾼于前两者。
9.1、Collections.synchronizedMap(Map)创建线程安全的Map集合
在SynchronizedMap内部维护了一个普通的对象Map,还有排斥锁mutex,如下图:
private static class SynchronizedMap<K,V>
implements Map<K,V>, Serializable {
private static final long serialVersionUID = 1978198479659022715L;
private final Map<K,V> m; // Backing Map
final Object mutex; // Object on which to synchronize
SynchronizedMap(Map<K,V> m) {
this.m = Objects.requireNonNull(m);
mutex = this; //如果没有传入mutex参数,则赋值为当前synchronizedMap对象
}
SynchronizedMap(Map<K,V> m, Object mutex) {
this.m = m;
this.mutex = mutex;
}
Collections.synchronizedMap(new HashMap<>(16));
我们在调用这个方法的时候需要传入一个Map,可以看到右两个构造方法,如果你传入了mutex参数,则将对象排斥锁赋值为传入的对象。如果没有传入mutex参数,则将对象排斥锁赋值为this,即调用synchronizedMap对象,就是上面的Map。
创建出synchronizedMap之后,再操作map的时候,就会对⽅法上锁,如下图全是锁:
public int size() {
synchronized (mutex) {return m.size();}
}
public boolean isEmpty() {
synchronized (mutex) {return m.isEmpty();}
}
public boolean containsKey(Object key) {
synchronized (mutex) {return m.containsKey(key);}
}
public boolean containsValue(Object value) {
synchronized (mutex) {return m.containsValue(value);}
}
public V get(Object key) {
synchronized (mutex) {return m.get(key);}
}
public V put(K key, V value) {
synchronized (mutex) {return m.put(key, value);}
}
public V remove(Object key) {
synchronized (mutex) {return m.remove(key);}
}
public void putAll(Map<? extends K, ? extends V> map) {
synchronized (mutex) {m.putAll(map);}
}
public void clear() {
synchronized (mutex) {m.clear();}
}
如上所述,调用Collections.Synchronized()方法初始化HashMap时,内部对Map的操作全是基于sychronized同步锁代码块,你说能不安全不!!!
9.2、HashTable
啥也别说,直接上源码,
与HashMap不同之处,在于HashTable内部对数据的读取操作,也是都上了锁:
get方法源码:
@SuppressWarnings("unchecked")
public synchronized V get(Object key) {
Entry<?,?> tab[] = table;
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {
if ((e.hash == hash) && e.key.equals(key)) {
return (V)e.value;
}
}
return null;
}
put方法源码:
public synchronized V put(K key, V value) {
// Make sure the value is not null
if (value == null) {
throw new NullPointerException();
}
// Makes sure the key is not already in the hashtable.
Entry<?,?> tab[] = table;
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
@SuppressWarnings("unchecked")
Entry<K,V> entry = (Entry<K,V>)tab[index];
for(; entry != null ; entry = entry.next) {
if ((entry.hash == hash) && entry.key.equals(key)) {
V old = entry.value;
entry.value = value;
return old;
}
}
addEntry(hash, key, value, index);
return null;
}
还有HashTable是不允许键和值中出现null值,而HashMap可以允许键和值均为null值。
9.2.1、为什么HashMap允许key和value为null值?
首先我们还是看下HashMap的key和value为什么可以为NULL值:
//value
public V put(K key, V value) {
return putVal(hash(key), key, value, false, true);
}
//key
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
可以看出HashMap对key为null值时做了处理,通过hash计算的时候,会判断key如果为null值,直接返回hash结果为0,所以一个HashMap中只允许有一个null值的key,后续的键值对如果key为null值,put时会直接把原来的value覆盖替换掉。
9.2.2、为什么HashTable不允许key和value为空值?
首先是因为HashTable内部对null值的key和value做了异常处理。
原因是因为HashTable使用的是安全失败机制(fail-safe)
,这种机制会使你此次读到的数据不一定是最新的数据。
如果你使用的是null值,就会使其无法判断对应的key是不存在还是为空,
因为你无法再调用一次contain(key)来对key是否存在进行判断,此处ConcurrentHashMap同理。
与HashMap不同之处:
- key和value是否可为null值:Hashtable否,HashMap是;
- 实现方式不同:HashTable继承了Dictionary类,而HashMap继承的是AbstractMap类;
- 初始化容量不同:HashTable的初始容量为:11,HashMap初始化容量为:16,两者的负载因子默认都为:0.75;
- 扩容机制不同:当现有容量达到扩容阈值时,,HashTable的扩容规则为原数组的2倍 + 1,HashMap扩容机制为源数组的2倍;
-
迭代器不同:HashTable的
Enumerator
不是快速失败机制(fail-fast)
,而HashMap的Iterator
迭代器是快速失败机制(fail-fast)
; -
抛出异常不同:所以当多个线程改变了HashMap的结构,如:增加、删除元素时,将会抛出
ConcurrentModificationException异常
,而HashTable则不会。
9.2.3、快速失败机制(fail-fast)
应用场景:快速失败机制(fail-fast)是Java中的一种机制,在用迭代器遍历一个集合对象时,如果遍历过程中对集合对象的内容进行了修改(增删改),则会抛出ConcurrentModificationException异常
。
原理:迭代器在遍历时直接访问集合中的内容,并且在遍历的过程中使用一个modeCount
变量;
上述就是集合在被遍历期间如果内容发生了变化,就会改变modCount的值,而迭代器在使用hashNext()/next()方法遍历下一个元素前,都会先检测当前modCount值是否为ExpectedModCount所预期的值,是的话就返回遍历;否则就抛出异常,终止遍历。
这⾥异常的抛出条件是检测到 modCount!=expectedmodCount 这个条件。如果集合发⽣变化时修改过的modCount值刚好⼜设置为了ExpectedModCount值,则异常不会抛出。
因此,不能依赖于这个异常是否抛出而进行并发操作的编程,这个异常只建议用于检测并发修改的bug。
Java.util包下的集合类
都是快速失败的,不能在多线程下发生并发修改(迭代过程中被修改),算是一种安全机制吧。
9.2.4、安全失败机制(fail-safe)
应用场景:Java.util.concurrent包下的容器都是安全失败的,可以在多线程下并发使用,并发修改。
原理:采用安全机制的集合容器,在遍历时不是直接在集合内容*问,而是先将原有集合复制一份,在拷贝的这份集合上进行遍历。所以在遍历过程中,其他线程对原集合无论做出怎样的修改,都跟迭代过程无关,所以不会触发ConcurrentModificationException异常
。
缺点:基于拷贝的内容的优点是避免了ConcurrentModificationException,但同样地,迭代器并不能访问到修改后的内容,即:迭代器遍历的是开始遍历那一刻拿到的拷贝集合,在遍历期间原集合做的任何修改,迭代器都不知道。
9.3、ConcurrentHashMap
应用场景:在开发过程中大都是使⽤ConcurrentHashMap,因为它的并发性效率相⽐前两者好很多。
9.3.1、ConcurrentMap的底层数据结构(JDK1.7)
底层数据结构:底层是基于数组+链表
组成的,JDK1.7和JDK1.8中具体的实现稍有不同。
先说一下JDK1.7中的数据结构:
如图所示:是由Segment数组、HashEntry组成,和HashMap一样,仍然是数组+链表
。
Segment是ConcurrentHashMap的一个内部类,主要的组成如下:
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
// 和 HashMap 中的 HashEntry 作⽤⼀样,真正存放数据的数组位桶
transient volatile HashEntry<K,V>[] table;
transient int count;
// 快速失败(fail—fast),起关键作用的变量
transient int modCount;
// 阈值⼤⼩
transient int threshold;
// 负载因⼦
final float loadFactor;
}
HashEntry和HashMap是差不多的,但是不同点是,HashEntry使用的是volatile
去修饰它的数据value,还有指向下一个节点的next;
volatile关键字的特性是啥:
- 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这个新的值对其他线程来说是可见的(实时可见性);
- 禁止进行指令重排序(实现有序性);
- volatile只能保证对单次读/写的原子性,像i++这种操作不能保证原子性。
9.3.2、如何实现高并发和线程安全的?
原理上来说,ConcurrentHashMap采用了分段锁
技术,其中Segment继承于ReentranLock
。
不会像HashTable那样不管是put还是get操作,都需要做同步操作处理,理论上ConcurrentHashMap支持CurrencyLevel(Segment数组数量)
的线程并发。
每当一个线程占用锁访问一个Segment时,不会影响到其他的Segment数组。
就是说如果集合的容量是16它的并发度就是16,可以同时允许16个线程操作16个Segment而且还是线程安全的。
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
//这就是为啥ConcurrentHashMap不可以put null值的原因
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
它先定位到Segment,然后再进⾏put操作。
我们看看下put源代码,是如何做到线程安全的?
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 将当前 Segment 中的 table 通过 key 的 hashcode 定位到 HashEntry
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
// 遍历该 HashEntry,如果不为空则判断传⼊的 key 和当前遍历的 key 是否相等,相等则覆盖旧的value。
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}else {
// 不为空则需要新建⼀个 HashEntry 并加⼊到 Segment 中,同时会先判断是否需要扩容。
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
//释放锁
unlock();
}
return oldValue;
}
首先第一步的时候会尝试获取锁,如果获取失败肯定就有其他线程存在竞争,则利用scanAndLockForPut()自旋获取锁
。
- 尝试自旋获取锁;
- 如果重试的次数达到了
MAX_SCAN_RESTRIES
则改为阻塞锁获取,保证能获取成功。
9.3.3、get方法的逻辑
ConcurretnHashMap集合的get方法逻辑比较简单,只需要通过hash计算之后定位到具体的Segment,再通过一次hash定位到具体的元素上。
get步骤:
- 根据计算出来的hashCode寻址,如果在数组位桶上有对应的地址,就直接返回value;
- 如果是红黑树,那就按照树的方式去获取值;
- 上述两者都不满足就按照链表的方式遍历获取值。
get源代码:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
//根据计算出来的hashCode寻址,如果在数组位桶上有对应的地址,就直接返回value;
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//如果是红黑树,那就按照树的方式去获取值;
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//上述两者都不满足就按照链表的方式遍历获取值。
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
由于HashEntry中的value属性是用volatile关键词修饰的,保证了内存的可见性,所以每次get时获取到的都是最新值。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
//volatile 修饰val
volatile V val;
//volatile 修饰引用
volatile Node<K,V> next;
Node(int hash, K key, V val) {
this.hash = hash;
this.key = key;
this.val = val;
}
Node(int hash, K key, V val, Node<K,V> next) {
this(hash, key, val);
this.next = next;
}
}
ConcurrentHashMap的方法是非常高效的,因为整个过程都不需要加锁。
在JDK1.7中ConcurrentHashMap虽然支持每个Segment并发访问,但是还是存在一些问题:
缺点:因为它的底层数据结构仍然是数组加链表的形式,我们去查询的时候,还得遍历整个链表,会导致效率很低,这个跟JDK1.7中的HashMap是存在一样的问题,所以它在JDK1.8中完全优化了。
9.3.4、ConcurrentHashMap的底层数据结构(JDK1.8)
其中抛弃了原有的Segment分段锁,而采用了CAS + synchronized
来保证并发安全性。
跟HashMap很像,也把之前的HashEntry改成了Node,但是作用不变,把value和next采用了volatile
去修饰,保证了可见性,并且也引入了红黑树,在链表中大于一定值的时候会转化为(默认值是8)。
9.3.5、ConcurrentHashMap进行put操作(JDK1.8)
ConcurrentHashMap在进行put操作时是比较复杂的,大致可以分为以下步骤:
- 根据key计算出hashcode;
- 判断是否需要进行初始化;
- 即为当前key定位出的Node,如果为空表示当前位置可以写入数据,利用
CAS
尝试写入,失败的话则执行自旋保证成功; - 如果当前位置的hashCode == MOVED == -1,则需要进行扩容;
- 如果都不满足,则利用synchronized锁写入数据;
- 如果数量大于
TREEIFY_THRESHOLD
则要转换为红黑树。
9.4、CAS(乐观锁)
CAS
是乐观锁的一种实现方式,是一种轻量级锁,JUC中很多工具类的实现就是基于CAS的。
CAS操作的流程如下图所示,线程在读取
数据时不进行加锁,在准备写回数据时,比较原值是否被修改,若未被其他线程修改则写回,若已被修改,则重新执行读取流程。
这是一种乐观策略,认为并发操作并不总会发生。
乐观锁在开发场景中非常常用。
举个栗子,就比如我们现在要修改数据库中的一条数据,修改之前我们要先拿到它原来的值,然后在SQL语句中还要加一个判断,就是原来的值
和我们现在拿到的它的原来的值
是否一样,一样的话就可以修改了,不一样的话就证明被别的线程修改了,那我们就直接return错误就妥了。
SQL代码大概如下:
update a set value = newValue where value = #{oldValue}//oldValue就是我们执⾏前查询出来的值
9.4.1、CAS是否一定能保证数据没被别的别的线程修改过?
当然不能,比如很经典的ABA问题,CAS就无法判断,其实这一点我们在上述9.2.3、快速失败机制(fail-fast)
中也讲到了类似的问题。
9.4.2、什是ABA?
就是说一个线程把值改为了B,另一个线程把值又改为了A,对于这个时候判断的线程,就发现它的值还是A,所以它就不知道这个值到底有没有被其他线程改动过,其实在大多数场景下是只追求最终结果,是正确的就行。
但是在实际开发中,是需要记录每一步的修改过程的,比如银行转账资金修改什么的,你每次修改的都应该是有记录的,方便回溯。
9.4.3、如何解决ABA问题?
方法1:版本号验证
用版本号去进行验证(可自定义组合而成)
就妥了,就比如说,我在修改前去查询它原来的值所对应的版本号
,每次判断就连值和版本号一起进行判断,判断成功就给版本号加1,因为这又是一次操作要被记录下来。
update a set value = newValue ,vision = vision + 1 where value = #{oldValue} and vision = #{vision} // 判断原来的值和版本号是否匹配,中间有别的线程修改,值可能相等,但是版本号100%不⼀样
所以这样就解决了ABA问题,保证了一个线程操作的数据不被别的线程所修改。
方法2:时间戳
其实时间戳和版本号作用是一样的,异曲同工,对一个数据进行操作的时候把时间戳也带上,我们知道时间戳每一刻都在变化,所以任何时候的操作只要记录下来时间戳,后续再对该数据进行操作时带上时间戳进行判断就解决了ABA问题。
9.5、CAS乐观锁效率很高,synchronized效率不是很好,为啥JDK1.8之后反而增加了synchronized同步锁的数量?
synchronized之前一直都是重量级的锁,但是后来Java官方是对他进行过升级了,它现在采用的是锁升级
的方式去做的。
针对sychronized获取锁的方式,JVM使用了锁升级的优化方式,就是先使用偏向锁优先同一线程,然后再次获取锁:
- 如果失败,就会升级为
CAS轻量级锁
; - 如果失败就会短暂
自旋
,防止线程被系统挂起; - 最后如果上述都失败,就会升级为
重量级锁
。
所以synchronized升级后,刚开始不是直接就使用重量级锁,而是通过很多轻量级锁的方式一步步升级上去的。
小总结:
JDK1.8对JDK1.7的数据结构做了很大的改动,采用红黑树之后可以保证查询效率为O(logn),甚至取消了ReentrantLock改为了synchronized同步锁,这样可以看出在新版的JDK中对synchronized优化是很到位的。