一、JDK8ConcurrentHashMap初始化

1.1、源码分析

JDK8 的 ConcurrentHashMap 中一个有五个构造方法,和 JDK8 的 HashMap 一样,这五个构造方法中都没有对 Map 内部的 Node 数组进行初始化,只是对一些变量的初始值做了处理。

JDK8 的 ConcurrentHashMap 的 Node 数组初始化是在第一次添加元素时完成的。

1、无参构造函数

创建一个的、的,数组长度为初始化长度(16)的ConcurrentHashMap对象

1
2
3
4
5
/**
* Creates a new, empty map with the default initial table size (16).
*/
public ConcurrentHashMap() {
}

ConcurrentHashMap的数组默认初始化长度为 16,且

1
2
3
4
5
/**
* The default initial table capacity. Must be a power of 2
* (i.e., at least 1) and at most MAXIMUM_CAPACITY.
*/
private static final int DEFAULT_CAPACITY = 16;

2、传入初始容量的构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Creates a new, empty map with an initial table size
* accommodating the specified number of elements without the need
* to dynamically resize.
*
* @param initialCapacity The implementation performs internal
* sizing to accommodate this many elements.
* @throws IllegalArgumentException if the initial capacity of
* elements is negative
*/
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

ConcurrentHashMap 会基于传入的 initialCapacity 计算一个比这个值大且为2的幂次方的整数作为 ConcurrentHashMap 的初始容量

HashMap 一样,这个功能由 tableSizeFor 方法完成

但在ConcurrentHashMap中,即使传入的初始容量 initialCapacity 为2的幂次方,在经过 tableSizeFor 方法后也会返回一个比 initialCapacity 大的且为2的幂次方的值。

image-20210304220256402

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Returns a power of two table size for the given desired capacity.
* See Hackers Delight, sec 3.2
*/
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

类比上面的 tableSizeFor 方法和 构造方法 进行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ConcurrentHashMapStudy {
private static final int MAXIMUM_CAPACITY = 1 << 30;
public static void main(String[] args) {
System.out.println(getCapacity(32));
}
public static int getCapacity(int n) {
return tableSizeFor(n + (n >>> 1) + 1);
}
private static int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
}

结果

image-20210304221040912

3、含有三个参数的构造方法

这三个参数分别为:initialCapacity (初始容量)、 loadFactor (负载因子) 和 concurrencyLevel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Creates a new, empty map with an initial table size based on
* the given number of elements ({@code initialCapacity}), table
* density ({@code loadFactor}), and number of concurrently
* updating threads ({@code concurrencyLevel}).
*
* @param initialCapacity the initial capacity. The implementation
* performs internal sizing to accommodate this many elements,
* given the specified load factor.
* @param loadFactor the load factor (table density) for
* establishing the initial table size
* @param concurrencyLevel the estimated number of concurrently
* updating threads. The implementation may use this value as
* a sizing hint.
* @throws IllegalArgumentException if the initial capacity is
* negative or the load factor or concurrencyLevel are
* nonpositive
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

含有两个参数的构造函数实际上调用了此构造函数,且 concurrencyLevel 为1

1
2
3
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}

4、传入一个Map的构造函数

1
2
3
4
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}

5、ConcurrentHashMap 中数组的初始化时机

查看源码,我们发现 ConcurrentHashMap

中 Node 数组也是懒加载的,在第一次 put 时才创建数组。

image-20211017142032826

1.2、 sizeCtl 含义解释

注意:以上这些构造方法中,都涉及到一个变量 sizeCtl ,这个变量是一个非常重要的变量,而且具有非常丰富的含义,它的值不同,对应的含义也不一样,这里我们先对这个变量不同的值的含义做一下说明,后续源码分析过程中,进一步解释

  • sizeCtl为0,代表数组未初始化, 且数组的初始容量为16

  • sizeCtl为正数

  1. 如果数组未初始化,那么其记录的是数组的初始容量
  2. 如果数组已初始化,那么其记录的是数组的扩容阈值

扩容阈值 = 数组的初始容量 * 负载因子(默认是 0.75 )

  • sizeCtl为 -1,表示数组正在进行初始化

  • sizeCtl小于0,并且不是 -1,表示数组正在扩容, -(1+n),表示此时有n个线程正在共同完成数组的扩容操作

  • sizeCtl是一个被 volatile 修饰的变量,保证其可见性

1
private transient volatile int sizeCtl;

二、JDK8添加安全

2.1、put方法和putVal方法

和 HashMap一样,ConcurrentHashMap 的 put 方法底层同样调用了putVal方法

1
2
3
public V put(K key, V value) {
return putVal(key, value, false);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
final V putVal(K key, V value, boolean onlyIfAbsent) {
//如果有空值或者空键,直接抛异常
if (key == null || value == null) throw new NullPointerException();
//基于key计算hash值,并进行一定的扰动
int hash = spread(key.hashCode());
//记录某个桶上元素的个数,如果超过8个,会转成红黑树
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果数组还未初始化,先对数组进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//如果hash计算得到的桶位置没有元素,利用cas将元素添加
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//cas+自旋(和外侧的for构成自旋循环),保证元素添加安全
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果hash计算得到的桶位置元素的hash值为MOVED,证明正在扩容,那么协助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
//hash计算的桶位置元素不为空,且当前没有处于扩容操作,进行元素添加
V oldVal = null;
//对当前桶进行加锁,保证线程安全,执行元素添加操作
synchronized (f) {
if (tabAt(tab, i) == f) {
//普通链表节点
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//树节点,将元素添加到红黑树中
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//链表长度大于/等于8,将链表转成红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
//如果是重复键,直接将旧值返回
if (oldVal != null)
return oldVal;
break;
}
}
}
//添加的是新元素,维护集合长度,并判断是否要进行扩容操作
addCount(1L, binCount);
return null;
}

putVal方法中可以发现,ConcurrentHashMap在第一次put的时候才初始化Node数组,这一点和HashMap类似

1
2
3
//如果数组还未初始化,先对数组进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();

1、使用 spread 扰动方法使得到的哈希值更加分散

与 HashMap 相同, ConcurrentHashMap 同样有一个方法来对 Key 计算得到的哈希值进行扰动,与 HashMap 不同的是,ConcurrentHashMap 还引入了一个 HASH_BITS 常量来参与计算,目的是为了让得到的哈希值一定为正数

1
2
3
4
5
// usable bits of normal node hash
static final int HASH_BITS = 0x7fffffff;
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

HASH_BITS 换算为二进制结果如下,任何数与 HASH_BITS 进行运算后,得到的结果第一位都会是 0 ,而二进制的第一位为符号位,符号位 0 为正数

image-20211017151227503

2、initTable初始化数组方法

初始化 Node 数组的方法,这里使用 CAS 自旋保证线程安全

  • 如果当前 sizeCtl < 0 ,那么根据表示有线程正在对 Node 数组进行初始化,当前线程需要让出cpu调度权给正在进行初始化数组的线程
1
2
3
//如果sizeCtl的值(-1)小于0,说明此时正在初始化, 让出cpu
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
  • 如果 sizeCtl != 0 ,那么当前线程可以对 Node 数组进行初始化,这里使用CAS自选来对 sizeCtl 进行修改,如果修改成功就继续初始化,否则继续自旋
  • 如果修改 sizeCtl 成功,此时先对 Node 数组进行判空检查
  • 如果 Node 数组为空,就 new 一个 Node 数组,同时计算扩容阈值

由于ConcurrentHashMap的默认负载因子为 0.75 ,而 n 右移两位的值刚好等于原值n的四分之一,故 n - (n >>> 2) 正好为原值n的 0.75 倍,这里使用了位运算来避免除法提高效率。

  • 在计算完阈值之后,使用 break 跳出循环。

在 initTable 方法中没有显式加锁,而是使用 CAS 自旋的方式来保证线程安全。

当一个线程抢到 CPU 的执行权时,如果此时 sc 的值小于 0 ,那么证明已经有线程在对这个 ConcurrentHashMap 对象进行初始化,所以当前线程需要让出 CPU 的执行权。如果此时 sc 的值不小于 0 ,那么证明还没有对数组进行初始化,此时使用 CAS 保证只有一个线程对数组进行初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//cas+自旋,保证线程安全,对数组进行初始化操作
while ((tab = table) == null || tab.length == 0) {
//如果sizeCtl的值(-1)小于0,说明此时正在初始化, 让出cpu
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//cas修改sizeCtl的值为-1,修改成功,进行数组初始化,失败,继续自旋
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 在这里使用双重检查防止多次初始化
if ((tab = table) == null || tab.length == 0) {
//sizeCtl为0,取默认长度16,否则去sizeCtl的值,如果 sizeCtl > 0 且数组未初始化,则 sizeCtl 为初始化的大小
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//基于初始长度,构建数组对象
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//计算扩容阈值,并赋值给sc
sc = n - (n >>> 2);
}
} finally {
//将扩容阈值,赋值给sizeCtl
sizeCtl = sc;
}
break;
}
}
return tab;
}

3、计算键值对添加的位置

  • 与 HashMap 相同,ConcurrentHashMap 中的寻址方法也是 Key 的哈希值与数组长度 - 1 相与

  • 在初始化完数组之后,会使用tabAt 函数计算要添加的Entry 对象在数组中的位置

这里使用两个 CAS 来保证线程安全,如果两个线程同时走到下面的 if 中,那么也可以保证线程安全,下面的 tabAt 和 casTabAt 都是 CAS 操作

1
2
3
4
5
6
7
//如果hash计算得到的桶位置没有元素,利用cas将元素添加
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//cas+自旋(和外侧的for构成自旋循环),保证元素添加安全
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
  • tabAt函数底层使用了CAS
1
2
3
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
  • 如果table数组中计算出来的位置元素为空,那么直接将直接创建一个Node节点并添加到该位置即可。

4、多线程协助扩容

如果计算出来Node数组中要插入位置的元素的hash值为MOVED ,那么证明当前 Node 数组中要插入位置的元素为 forwarding 节点,这个节点的hash值为 -1。

forwarding 节点和 多线程协助扩容 有关,当数组中某个位置存放的节点值的 hash 值为 -1 时,代表这个数组中的元素正在进行扩容操作,此时不能进行添加操作,但是可以进行协助扩容,所以这里调用了 helpTransfer 方法进行扩容操作。

1
2
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);

ConcurrentHashMapMOVED 的定义如下

1
static final int MOVED     = -1; // hash for forwarding nodes

5、put方法加锁图解

Snipaste_2020-05-16_12-04-08

  • 上面的 if/else-if 分别对应 Node 数组未初始化Node 数组要添加的位置元素为空Node 数组要添加的位置正在扩容的情况。

  • 下面的 else 表示要添加的位置元素已经存在元素了,此时对该位置上的元素加锁。

  • ConcurrentHashMap 在添加元素时,只会对要加入的桶加锁,不会影响其他桶位的线程操作,锁住的是这个桶上的头节点

  • 在添加时,如果当前桶位上的头节点的哈希值大于等于0,那么证明这个桶中存放的是链表结构的元素

如果是链表元素,那么循环遍历这个链表,如果发现链表元素的 key 和待插入节点的 key 相等,那么直接用新值覆盖旧值,否则就使用尾插法插入元素

  • 如果此时桶上的元素已经树化,那么就按照红黑树的插入规则进行新增
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
else {
//hash计算的桶位置元素不为空,且当前没有处于扩容操作,进行元素添加
V oldVal = null;
//对当前桶进行加锁,保证线程安全,执行元素添加操作
synchronized (f) {
// 为什么还要做判断,防止这个桶上的元素树化
if (tabAt(tab, i) == f) {
//普通链表节点
if (fh >= 0) {
binCount = 1;
//遍历链表,逐个比较链表元素的key和待插入节点的key是否相等
//如果不是,就使用尾插法插入元素,否则就覆盖旧值
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//树节点,将元素添加到红黑树中
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//链表长度大于/等于8,将链表转成红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
//如果是重复键,直接将旧值返回
if (oldVal != null)
return oldVal;
break;
}
}

6、添加完元素后进行链表长度判断

  • HashMap 类似,在添加完一个元素后需要对桶元素个数数组长度进行判断,如果有一个桶中链表长度大于8数组长度大于64,就将该桶上的链表转换为一棵红黑树。
1
2
3
4
5
6
7
8
9
if (binCount != 0) {
//链表长度大于/等于8,将链表转成红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
//如果是重复键,直接将旧值返回
if (oldVal != null)
return oldVal;
break;
}
  • 树化方法 treeifyBin 只有当数组长度大于等于64的时候才会选择树化,且树化时的加锁过程也是对单一桶加锁。

如果数组长度小于64,会进行数组扩容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
   static final int MIN_TREEIFY_CAPACITY = 64;
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
// 如果当前链表长度小于 64 ,则会进行扩容,而不是树化
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
  • 使用 addCount 方法维护集合长度
1
addCount(1L, binCount);

2.2、ConcurrentHashMap 空值和空键问题

HashMap是允许空键空值的,其中空键只允许有一个,空值允许有多个,但 ConcurrentHashMap 不允许有空键空值。

image-20210304223605030

spread方法是 ConcurrentHashMap 中的扰动方法 ,用于增加散列性,且保证 spread 方法返回的值一定为正数。

在HashMap的hash方法计算结果的基础上,让得到的值与 0x7fffffff 进行一个与运算。

0x7fffffff 放入计算器中计算,得到的二进制值为

01111111111111111111111111111111

任何数与 01111111111111111111111111111111 可以保证该数的第一位为0,在二进制中,第一位为0的数为正数。

1
2
3
4
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash    
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

2.3、addCount 方法源码分析

在添加数据后,ConcurrentHashMap 会调用 addCount 方法来维护集合的长度,如果发现集合中的元素大于扩容阈值,那么此时会进行扩容操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

1、baseCount 变量

  • 为了统计 ConcurrentHashMap 中的元素个数 size , ConcurrentHashMap 提供了 baseCount 和 counterCells 两个辅助变量来记录元素个数
1
2
3
// ConcurrentHashMap中元素个数,但返回的不一定是当前Map的真实元素个数。基于CAS无锁更新
private transient volatile long baseCount;
private transient volatile CounterCell[] counterCells;

当有多个线程需要记录元素个数时,如果它们对 baseCount 的原子修改失败,那么它们会找到 counterCells 数组中的一个元素,然后对其中的一个元素进行 + 1 操作,故而 baseCount 变量的值不是准确的真实元素个数,而是 baseCount + counterCells 中所有数组元素的和。

  • 在 addCount 方法中,我们可以看到一个 sumCount 方法,这个方法就是用来计算 ConcurrentHashMap 中真实元素个数的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
// 遍历 counterCells 数组
for (int i = 0; i < as.length; ++i) {
// 如果下标对应的元素不为空,那么进行增加操作。
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

2、计算完元素个数后进行扩容

  • 如果要进行扩容,那么需要将 sizeCtl 变量变为一个负数, ConcurrentHashMap 采用了左移的方法将变量的二进制符号位变为 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// s 是上一步计算出来的, ConcurrentHashMap 中实际的元素个数,我们判断 s 变量与扩容阈值的关系
// 由于到这一步时 ConcurrentHashMap 中的数组已经扩容完毕,所以此时的 sizeCtl 变量中存储的就是扩容阈值
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
// 第一次进来时,由于 sc 是此时的扩容阈值,所以不可能为负数,所以会进入下面的分支
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 第一次进来时,会进入这个分支,在这个操作中,会将 sc 改为一个负数(这里将 sc 进行左移,配合上面的操作令第一位为1)
// 第一个线程进来时,它要进行扩容,所以需要将 sizeCtl 改为一个负数,告诉其他线程正在扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}

transfer 方法是真正进行扩容操作的方法

三、扩容方法 transfer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//如果是多cpu,那么每个线程划分任务,最小任务量是16个桶位的迁移
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//如果是扩容线程,此时新数组为null
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
//两倍扩容创建新数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//记录线程开始迁移的桶位,从后往前迁移
transferIndex = n;
}
//记录新数组的末尾
int nextn = nextTab.length;
//已经迁移的桶位,会用这个节点占位(这个节点的hash值为-1--MOVED)
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
//i记录当前正在迁移桶位的索引值
//bound记录下一次任务迁移的开始桶位

//--i >= bound 成立表示当前线程分配的迁移任务还没有完成
if (--i >= bound || finishing)
advance = false;
//没有元素需要迁移 -- 后续会去将扩容线程数减1,并判断扩容是否完成
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//计算下一次任务迁移的开始桶位,并将这个值赋值给transferIndex
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//如果没有更多的需要迁移的桶位,就进入该if
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//扩容结束后,保存新数组,并重新计算扩容阈值,赋值给sizeCtl
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//扩容任务线程数减1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//判断当前所有扩容任务线程是否都执行完成
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//所有扩容线程都执行完,标识结束
finishing = advance = true;
i = n; // recheck before commit
}
}
//当前迁移的桶位没有元素,直接在该位置添加一个fwd节点
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//当前节点已经被迁移
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//当前节点需要迁移,加锁迁移,保证多线程安全
//此处迁移逻辑和jdk7的ConcurrentHashMap相同,不再赘述
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

3.1、线程启动扩容

addCount 方法中,如果此时的 sizeCtl 大于等于 0 ,此时将 sizeCtl 变为一个小于 0 的负数,然后调用 transfer 方法进行扩容,此时传入的参数为当前 ConcurrentHashMap 中旧的数组对象,同时在 nextTab 新数组对象的位置传入一个 null

image-20211017210220644

在线程启动扩容时,由于传入的 nextTab 对象为空,那么此时会创建一个容量为原来数组两倍的新数组,然后进行其他工作,我们查看 transfer 方法中的相关源码

  • 记录原数组的长度

image-20211017211202250

  • 进行判断,如果传入的 nextTab 为空,那么创建一个长度为原数组两倍的新数组
1
2
3
4
5
6
7
8
9
10
11
12
13
if (nextTab == null) {            // initiating
try {
@SuppressWarnings("unchecked")
// 使用左移
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}

3.2、多线程协助扩容

ConcurrentHashMap 会判断机器的 CPU 个数,如果是多 CPU ,那么每个线程划分任务,每个线程至少负责 16 个桶的数据迁移。

  • 使用一个常量用于表示当前系统的 CPU 数量
1
2
3
4
/** Number of CPUS, to place bounds on some sizings.
* 当前系统的cpu数量
*/
static final int NCPU = Runtime.getRuntime().availableProcessors();
  • 线程迁移数据的最小步长,每个线程至少需要负责 16 个桶的数据迁移
1
2
// 线程迁移数据最小步长,控制线程迁移任务最小区间一个值
private static final int MIN_TRANSFER_STRIDE = 16;
  • 分配任务

首先判断当前系统的 CPU 个数,如果 CPU 个数大于 1 ,则进行任务划分,否则直接让该线程负责全部数据的迁移

当得到的任务区间长度小于 MIN_TRANSFER_STRIDE ,那么需要将任务区间长度置为 MIN_TRANSFER_STRIDE

1
2
3
//如果是多cpu,那么每个线程划分任务,最小任务量是16个桶位的迁移
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
  • 创建一个 Forwarding 节点,对于已经迁移完成的桶,会使用这个节点占位(这个节点的哈希值为 MOVED)

image-20211017212807945

  • ConcurrentHashMap 的数据迁移工作是由后往前迁移的
1
2
3
4
5
6
7
8
int n = tab.length, stride;
...
//如果是扩容线程,此时新数组为null
if (nextTab == null) { // initiating
...
//记录线程开始迁移的桶位,从后往前迁移
transferIndex = n;
}
  • 计算当前线程负责的区域
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
while (advance) {
int nextIndex, nextBound;
//i记录当前正在迁移桶位的索引值
//bound记录下一次任务迁移的开始桶位

//--i >= bound 成立表示当前线程分配的迁移任务还没有完成
if (--i >= bound || finishing)
advance = false;
//没有元素需要迁移 -- 后续会去将扩容线程数减1,并判断扩容是否完成
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//计算下一次任务迁移的开始桶位,并将这个值赋值给transferIndex
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
// 如果 nextIndex 大于 stride ,证明下一个线程依然可以分到,任务,如果小于 stride ,那么证明剩下的工作这个线程就可以完成,没必要y
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}