数据结构与算法学习(七)-ConcurrentHashMap源码解读
一、JDK8ConcurrentHashMap初始化
1.1、源码分析
在 JDK8 的 ConcurrentHashMap 中一个有五个构造方法,和 JDK8 的 HashMap 一样,这五个构造方法中都没有对 Map 内部的 Node 数组进行初始化,只是对一些变量的初始值做了处理。
JDK8 的 ConcurrentHashMap 的 Node 数组初始化是在第一次添加元素时完成的。
1、无参构造函数
创建一个新的、空的,数组长度为初始化长度(16)的ConcurrentHashMap对象
1 | /** |
ConcurrentHashMap的数组默认初始化长度为
16
,且
1 | /** |
2、传入初始容量的构造函数
1 | /** |
ConcurrentHashMap 会基于传入的
initialCapacity
计算一个比这个值大且为2的幂次方的整数作为 ConcurrentHashMap 的初始容量。和 HashMap 一样,这个功能由
tableSizeFor
方法完成但在ConcurrentHashMap中,即使传入的初始容量
initialCapacity
为2的幂次方,在经过tableSizeFor
方法后也会返回一个比initialCapacity
大的且为2的幂次方的值。
1 | /** |
类比上面的
tableSizeFor
方法和 构造方法 进行测试
1 | public class ConcurrentHashMapStudy { |
结果
3、含有三个参数的构造方法
这三个参数分别为:
initialCapacity
(初始容量)、loadFactor
(负载因子) 和concurrencyLevel
1 | /** |
含有两个参数的构造函数实际上调用了此构造函数,且 concurrencyLevel 为1
1 | public ConcurrentHashMap(int initialCapacity, float loadFactor) { |
4、传入一个Map的构造函数
1 | public ConcurrentHashMap(Map<? extends K, ? extends V> m) { |
5、ConcurrentHashMap 中数组的初始化时机
查看源码,我们发现 ConcurrentHashMap
中 Node 数组也是懒加载的,在第一次 put 时才创建数组。
1.2、 sizeCtl 含义解释
注意:以上这些构造方法中,都涉及到一个变量
sizeCtl
,这个变量是一个非常重要的变量,而且具有非常丰富的含义,它的值不同,对应的含义也不一样,这里我们先对这个变量不同的值的含义做一下说明,后续源码分析过程中,进一步解释
sizeCtl
为0,代表数组未初始化, 且数组的初始容量为16sizeCtl
为正数
- 如果数组未初始化,那么其记录的是数组的初始容量
- 如果数组已初始化,那么其记录的是数组的扩容阈值
扩容阈值 = 数组的初始容量 * 负载因子(默认是 0.75 )
sizeCtl
为 -1,表示数组正在进行初始化sizeCtl
小于0,并且不是 -1,表示数组正在扩容, -(1+n),表示此时有n个线程正在共同完成数组的扩容操作sizeCtl
是一个被 volatile 修饰的变量,保证其可见性
1 | private transient volatile int sizeCtl; |
二、JDK8添加安全
2.1、put方法和putVal方法
和 HashMap一样,ConcurrentHashMap 的 put 方法底层同样调用了putVal方法
1 | public V put(K key, V value) { |
1 | final V putVal(K key, V value, boolean onlyIfAbsent) { |
在
putVal
方法中可以发现,ConcurrentHashMap在第一次put
的时候才初始化Node数组,这一点和HashMap类似
1 | //如果数组还未初始化,先对数组进行初始化 |
1、使用 spread 扰动方法使得到的哈希值更加分散
与 HashMap 相同, ConcurrentHashMap 同样有一个方法来对 Key 计算得到的哈希值进行扰动,与 HashMap 不同的是,ConcurrentHashMap 还引入了一个
HASH_BITS
常量来参与计算,目的是为了让得到的哈希值一定为正数
1 | // usable bits of normal node hash |
将
HASH_BITS
换算为二进制结果如下,任何数与HASH_BITS
进行运算后,得到的结果第一位都会是 0 ,而二进制的第一位为符号位,符号位 0 为正数
2、initTable初始化数组方法
初始化 Node 数组的方法,这里使用 CAS 自旋保证线程安全
- 如果当前
sizeCtl < 0
,那么根据表示有线程正在对 Node 数组进行初始化,当前线程需要让出cpu调度权给正在进行初始化数组的线程
1 | //如果sizeCtl的值(-1)小于0,说明此时正在初始化, 让出cpu |
- 如果
sizeCtl != 0
,那么当前线程可以对 Node 数组进行初始化,这里使用CAS自选来对sizeCtl
进行修改,如果修改成功就继续初始化,否则继续自旋 - 如果修改
sizeCtl
成功,此时先对 Node 数组进行判空检查 - 如果 Node 数组为空,就 new 一个 Node 数组,同时计算扩容阈值
由于ConcurrentHashMap的默认负载因子为 0.75 ,而 n 右移两位的值刚好等于原值n的四分之一,故 n - (n >>> 2) 正好为原值n的 0.75 倍,这里使用了位运算来避免除法提高效率。
- 在计算完阈值之后,使用
break
跳出循环。
在 initTable 方法中没有显式加锁,而是使用 CAS 自旋的方式来保证线程安全。
当一个线程抢到 CPU 的执行权时,如果此时 sc 的值小于 0 ,那么证明已经有线程在对这个 ConcurrentHashMap 对象进行初始化,所以当前线程需要让出 CPU 的执行权。如果此时 sc 的值不小于 0 ,那么证明还没有对数组进行初始化,此时使用 CAS 保证只有一个线程对数组进行初始化。
1 | private final Node<K,V>[] initTable() { |
3、计算键值对添加的位置
与 HashMap 相同,ConcurrentHashMap 中的寻址方法也是 Key 的哈希值与数组长度 - 1 相与
在初始化完数组之后,会使用
tabAt
函数计算要添加的Entry
对象在数组中的位置
这里使用两个 CAS 来保证线程安全,如果两个线程同时走到下面的 if 中,那么也可以保证线程安全,下面的 tabAt 和 casTabAt 都是 CAS 操作。
1 | //如果hash计算得到的桶位置没有元素,利用cas将元素添加 |
tabAt
函数底层使用了CAS
1 | static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { |
- 如果table数组中计算出来的位置元素为空,那么直接将直接创建一个Node节点并添加到该位置即可。
4、多线程协助扩容
如果计算出来Node数组中要插入位置的元素的hash值为
MOVED
,那么证明当前 Node 数组中要插入位置的元素为 forwarding 节点,这个节点的hash值为 -1。forwarding 节点和 多线程协助扩容 有关,当数组中某个位置存放的节点值的 hash 值为 -1 时,代表这个数组中的元素正在进行扩容操作,此时不能进行添加操作,但是可以进行协助扩容,所以这里调用了
helpTransfer
方法进行扩容操作。
1 | else if ((fh = f.hash) == MOVED) |
ConcurrentHashMap 中
MOVED
的定义如下
1 | static final int MOVED = -1; // hash for forwarding nodes |
5、put方法加锁图解
上面的
if/else-if
分别对应 Node 数组未初始化、Node 数组要添加的位置元素为空、Node 数组要添加的位置正在扩容的情况。下面的
else
表示要添加的位置元素已经存在元素了,此时对该位置上的元素加锁。ConcurrentHashMap 在添加元素时,只会对要加入的桶加锁,不会影响其他桶位的线程操作,锁住的是这个桶上的头节点
在添加时,如果当前桶位上的头节点的哈希值大于等于0,那么证明这个桶中存放的是链表结构的元素
如果是链表元素,那么循环遍历这个链表,如果发现链表元素的 key 和待插入节点的 key 相等,那么直接用新值覆盖旧值,否则就使用尾插法插入元素
- 如果此时桶上的元素已经树化,那么就按照红黑树的插入规则进行新增
1 | else { |
6、添加完元素后进行链表长度判断
- 和 HashMap 类似,在添加完一个元素后需要对桶元素个数及数组长度进行判断,如果有一个桶中链表长度大于8且数组长度大于64,就将该桶上的链表转换为一棵红黑树。
1 | if (binCount != 0) { |
- 树化方法
treeifyBin
只有当数组长度大于等于64的时候才会选择树化,且树化时的加锁过程也是对单一桶加锁。
如果数组长度小于64,会进行数组扩容。
1 | static final int MIN_TREEIFY_CAPACITY = 64; |
- 使用 addCount 方法维护集合长度
1 | addCount(1L, binCount); |
2.2、ConcurrentHashMap 空值和空键问题
HashMap是允许空键空值的,其中空键只允许有一个,空值允许有多个,但 ConcurrentHashMap 不允许有空键空值。
spread
方法是 ConcurrentHashMap 中的扰动方法 ,用于增加散列性,且保证spread
方法返回的值一定为正数。在HashMap的hash方法计算结果的基础上,让得到的值与
0x7fffffff
进行一个与运算。将
0x7fffffff
放入计算器中计算,得到的二进制值为
01111111111111111111111111111111
任何数与
01111111111111111111111111111111
可以保证该数的第一位为0,在二进制中,第一位为0的数为正数。
1 | static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash |
2.3、addCount 方法源码分析
在添加数据后,ConcurrentHashMap 会调用 addCount 方法来维护集合的长度,如果发现集合中的元素大于扩容阈值,那么此时会进行扩容操作。
1 | private final void addCount(long x, int check) { |
1、baseCount 变量
- 为了统计 ConcurrentHashMap 中的元素个数 size , ConcurrentHashMap 提供了 baseCount 和 counterCells 两个辅助变量来记录元素个数
1 | // ConcurrentHashMap中元素个数,但返回的不一定是当前Map的真实元素个数。基于CAS无锁更新 |
当有多个线程需要记录元素个数时,如果它们对 baseCount 的原子修改失败,那么它们会找到 counterCells 数组中的一个元素,然后对其中的一个元素进行 + 1 操作,故而 baseCount 变量的值不是准确的真实元素个数,而是 baseCount + counterCells 中所有数组元素的和。
- 在 addCount 方法中,我们可以看到一个 sumCount 方法,这个方法就是用来计算 ConcurrentHashMap 中真实元素个数的方法
1 | final long sumCount() { |
2、计算完元素个数后进行扩容
- 如果要进行扩容,那么需要将 sizeCtl 变量变为一个负数, ConcurrentHashMap 采用了左移的方法将变量的二进制符号位变为 1
1 | if (check >= 0) { |
transfer
方法是真正进行扩容操作的方法
三、扩容方法 transfer
1 | private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { |
3.1、线程启动扩容
在
addCount
方法中,如果此时的 sizeCtl 大于等于 0 ,此时将 sizeCtl 变为一个小于 0 的负数,然后调用transfer
方法进行扩容,此时传入的参数为当前ConcurrentHashMap
中旧的数组对象,同时在nextTab
新数组对象的位置传入一个null
在线程启动扩容时,由于传入的 nextTab 对象为空,那么此时会创建一个容量为原来数组两倍的新数组,然后进行其他工作,我们查看 transfer 方法中的相关源码
- 记录原数组的长度
- 进行判断,如果传入的 nextTab 为空,那么创建一个长度为原数组两倍的新数组
1 | if (nextTab == null) { // initiating |
3.2、多线程协助扩容
ConcurrentHashMap 会判断机器的 CPU 个数,如果是多 CPU ,那么每个线程划分任务,每个线程至少负责 16 个桶的数据迁移。
- 使用一个常量用于表示当前系统的 CPU 数量
1 | /** Number of CPUS, to place bounds on some sizings. |
- 线程迁移数据的最小步长,每个线程至少需要负责 16 个桶的数据迁移
1 | // 线程迁移数据最小步长,控制线程迁移任务最小区间一个值 |
- 分配任务
首先判断当前系统的 CPU 个数,如果 CPU 个数大于 1 ,则进行任务划分,否则直接让该线程负责全部数据的迁移
当得到的任务区间长度小于
MIN_TRANSFER_STRIDE
,那么需要将任务区间长度置为MIN_TRANSFER_STRIDE
1 | //如果是多cpu,那么每个线程划分任务,最小任务量是16个桶位的迁移 |
- 创建一个 Forwarding 节点,对于已经迁移完成的桶,会使用这个节点占位(这个节点的哈希值为 MOVED)
- ConcurrentHashMap 的数据迁移工作是由后往前迁移的
1 | int n = tab.length, stride; |
- 计算当前线程负责的区域
1 | while (advance) { |