个中:
数组:Node类型的数组,数组中的每个元素相称于一个哈希桶:如果不发生哈希冲突,则每个元素都存储在数组中。如果发生哈希冲突,则每个元素存储的是链表的头节点或红黑树的根节点。链表:Node类型的链表,当发生哈希冲突时,通过链地址法将具有相同哈希值的Node节点组成一个链表。红黑树:当链表的长度大于8且数组的长度大于即是64时,将链表转换为红黑树进行存储。主要属性ConcurrentHashMap的主要属性及常量:
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { // Node数组,用于存储ConcurrentHashMap中的数据 transient volatile Node<K,V>[] table; // 扩容时产生的新的Node数组,其容量为table数组的2倍 private transient volatile Node<K,V>[] nextTable; / sizeCtl值: 0:表示数组未初始化,且数组的初始容量为16 正数:如果数组未初始化,则该值记录的是数组的初始容量;如果数组已经初始化,则该值记录的是数组的扩容阈值(数组初始容量0.75) -1:表示数组正在进行初始化 -N:表示数组正在进行扩容,参与扩容的线程数为N-1 / private transient volatile int sizeCtl; // Node数组最大容量 private static final int MAXIMUM_CAPACITY = 1 << 30; // Node数组默认容量 private static final int DEFAULT_CAPACITY = 16; // 扩容因子 private static final float LOAD_FACTOR = 0.75f; / 链表转红黑树阀值: 如果链表长度大于达到阀值,但数组容量小于64,则进行数组扩容。 如果链表长度大于达到阀值,且数组容量大于即是64,则将链表转为红黑树。 / static final int TREEIFY_THRESHOLD = 8; // 红黑树转链表阀值,如果红黑树中的节点个数达到阀值,则将红黑树转为链表。 static final int UNTREEIFY_THRESHOLD = 6; // 链表转红黑树数组容量最小值 static final int MIN_TREEIFY_CAPACITY = 64;}
主要构造Node
Node节点紧张用于封装key-value键值对,ConcurrentHashMap中的链表由多个相连的Node节点组成。

static class Node<K,V> implements Map.Entry<K,V> { // key的hash值 final int hash; // key键 final K key; // value值 volatile V val; // 指向下一个Node节点的指针 volatile Node<K,V> next; ...}
ForwardingNode
ForwardingNode紧张用于数组进行扩容时标识哈希桶对应的头节点,ForwardingNode中不保存key-value键值对,只保存扩容时产生的新的Node数组(nextTable)的引用。ForwardingNode中的hash值为MOVED(-1),它定义了一个find()方法,用于扩容时从nextTable中查找对应的Node节点。
TreeBin当ConcurrentHashMap中的链表转为红黑树后,数组中保存的引用为TreeBin,TreeBin内部不保存key-value键值对,而是保存TreeNode的List和红黑树Root。
TreeNode红黑树的节点,TreeNode继续了ConcurrentHashMap.Node。
布局函数ConcurrentHashMap的布局函数:
// 无参布局函数public ConcurrentHashMap() {}// 带初始化容量的布局函数public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); / 如果数组的初始容量大于(最大容量/2),则初始化容量大小为最大容量。 否则初始化容量大小为大于(initialCapacity+initialCapacity/2+1)的最小2^n值。 假设initialCapacity为8,则大于(8+8/2+1)的最小2^n为16,即:cap = 16 / int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); // 设置sizeCtl变量的值为数组的初始化容量 this.sizeCtl = cap;}// 带凑集的布局函数public ConcurrentHashMap(Map<? extends K, ? extends V> m) { // 设置sizeCtl变量的值为数组的默认容量 this.sizeCtl = DEFAULT_CAPACITY; // 将凑集中所有的key-value键值对添加到ConcurrentHashMap中 putAll(m);}// 带初始化容量和负载因子的布局函数public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1);}// 带初始化容量、负载因子以及并发线程数的布局函数public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); // 如果初始容量小于并发线程数,则初始容量为并发线程数的大小 if (initialCapacity < concurrencyLevel) initialCapacity = concurrencyLevel; // 数组大小 long size = (long)(1.0 + (long)initialCapacity / loadFactor); / 如果数组大小大于即是最大容量,则初始化容量大小为最大容量。 否则初始化容量大小为大于size的最小2^n值。 / int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); // 设置sizeCtl变量的值为数组的初始化容量 this.sizeCtl = cap;}
put方法实现事理
ConcurrentHashMap通过ConcurrentHashMap#put方法向个中添加key-value键值对。
ConcurrentHashMap#put方法实行流程,如图所示:
处理流程:
1)向ConcurrentHashMap中添加key-value键值对,打算key对应的hash值,自旋遍历table数组并进行判断:如果table数组还未初始化,则初始化table数组。如果key的hash值对应的数组下标处不存在Node节点,则根据key的hash值、key、value等值创建Node节点,并将创建的Node节点通过CAS的办法设置到hash值对应的数组下标处,退出自旋。如果key的hash值对应的数组下标处存在Node节点,且Node节点的hash值为-1(即:ForwardingNode),解释正在进行数组扩容,则当前哨程帮忙数组扩容,直到数组扩容完成,连续自旋。如果key的hash值对应的数组下标处存在Node节点,且Node节点的hash值不为-1(即:正常的Node节点),则锁定该下标对应的哈希槽,并根据哈希槽中头节点类型进行判断:如果节点类型为Node,解释哈希槽存储的是链表,则遍历链表并进行判断:如果存在与key的hash值及key值相等的节点,则根据onlyIfAbsent值进行判断:true:退出链表遍历。false:用传入的value更换旧value,退出链表遍历。如果不存在与key的hash值及key值相等的节点,则根据key的hash值、key、value等值创建Node节点并追加到链表的末端(尾插法),退出链表遍历。如果节点类型为TreeBin,解释哈希槽存储的是红黑树,则遍历红黑树并进行判断:如果存在与key的hash值及key值相等的节点,则根据onlyIfAbsent值进行判断:true:不做处理。false:用传入的value更换旧value。如果不存在与key的hash值及key值相等的节点,则根据key的hash值、key、value等值创建TreeNode节点添加到红黑树中。判断binCount的值是否为0:0:表示添加失落败,连续自旋。非0:表示添加成功,链表中的节点数是否达到8个:节点数达到8个:如果table数组大小达到64,则将链表转为红黑树;否则进行数组扩容。旧value是否为空:不为空,则返回旧value。2)增加节点总数,并返回null。源码解析ConcurrentHashMap#put方法源码解析:
public V put(K key, V value) { // 添加key-value键值对 return putVal(key, value, false);}/ 添加key-value键值对详细实现 /final V putVal(K key, V value, boolean onlyIfAbsent) { // 如果key或者value为空,则抛出空指针非常 if (key == null || value == null) throw new NullPointerException(); // 打算key的hash值 int hash = spread(key.hashCode()); int binCount = 0; // 自旋遍历table数组 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 如果table数组未初始化,则初始化table数组 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 如果根据key的hash值打算得到的数组下标(i)处不存在对应的Node节点,则根据key的hash值、key、value等值创建新的Node节点,并以CAS的办法将新创建的Node节点添加到数组对应的下标处 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 通过CAS的办法不才标i处创建Node节点 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 如果Node节点的hash值为MOVED(-1),解释正在进行数组扩容,则帮忙数组扩容,直到数组扩容完成 else if ((fh = f.hash) == MOVED) // 帮忙数组扩容 tab = helpTransfer(tab, f); else { // 此处表示根据key的hash值打算得到的数组下标(i)处已经存在对应的Node节点 V oldVal = null; // 锁定该下标对应的哈希槽 synchronized (f) { // 判断数组下标i处的Node节点是否被修正 if (tabAt(tab, i) == f) { // f节点的hash值大于即是0,解释是正常的Node节点(非转移节点) if (fh >= 0) { // 链表中已遍历的节点数 binCount = 1; // 遍历链表 for (Node<K,V> e = f;; ++binCount) { K ek; / 如果存在与key的hash值及key值相等的节点,则根据onlyIfAbsent值进行判断: true:退出链表遍历,连续自旋。 false:用传入的value更换旧value,退出链表遍历,连续自旋。 / if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; // onlyIfAbsent为false,则用传入的value更换旧value if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; // 如果不存在与key的hash值及key值相等的节点,则根据key的hash值、key、value等值创建Node节点并追加到链表的末端(尾插法) if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 如果根据key的hash值打算得到的数组下标(i)处存放的是红黑树,则根据key的hash值、key、value等值创建TreeNode节点并添加到红黑树中 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // binCount不即是0,解释已经添加成功 if (binCount != 0) { // 如果binCount大于即是8,则将链表转化为红黑树 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null;}
table数组初始化
ConcurrentHashMap中的table数组采取
ConcurrentHashMap#initTable方法源码解析:
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; // 如果table数组未初始化,则自旋进行初始化 while ((tab = table) == null || tab.length == 0) { // sizeCtl小于0,解释存在其他线程正在进行初始化,则让出CPU利用权 if ((sc = sizeCtl) < 0) // 让出CPU利用权 Thread.yield(); // lost initialization race; just spin // 通过CAS的办法将sizeCtl的值更新为-1(-1表示数组正在进行初始化) else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { // 双重检讨,再次判断table数组是否初始化 if ((tab = table) == null || tab.length == 0) { // table数组容量:如果sc大于0,则设置sc的值为table数组的初始容量,否则设置table数组的初始容量为默认值(16) int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 创建Node类型的数组,并赋值给table变量 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; // 设置sc的值为table数组初始容量的3/4(即:75%) sc = n - (n >>> 2); } } finally { // 将sc的值赋给sizeCtl变量,初始化完成 sizeCtl = sc; } break; } } return tab;}
根据key的hash值打算数组下标
根据key的hash值打算方法:
static final int spread(int h) { // h ^ (h >>> 16):高位运算,即:key.hashCode()的高16位异或低16位 return (h ^ (h >>> 16)) & HASH_BITS;}
个中,参数h为key调用hashCode()方法得到的值。
假设数组长度为16,数组下标打算过程,如图所示:
帮忙扩容
在put方法中,通过遍历table数组中的Node节点向ConcurrentHashMap中添加key-value键值对时,如果key的hash值对应的数组下标处的头节点为ForwardingNode节点(即:节点中的hash值为-1),解释正在进行数组扩容,则通过调用ConcurrentHashMap#helpTransfer方法帮忙数组扩容,直到数组扩容完成。
ConcurrentHashMap#helpTransfer方法源码解析:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; // 如果table数组不为空且Node节点类型为ForwardingNode节点,并且ForwardingNode节点的nextTable引用不为空,则帮忙数组扩容 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { // 获取扩容标识 int rs = resizeStamp(tab.length); // 自旋,个中sizeCtl<0表示正在进行数组扩容 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { / 知足以下四个条件,则退出自旋(即:不实行扩容操作) 条件1:当前哨程获取的扩容标识是否与通过sc打算出来的扩容标识同等,不一致则退出循环 条件2:默认第一个线程设置为sc==(rs<<16)+2,当第一个线程扩容结束sc的值-1,即:sc==(rs<<16)+1,表示扩容结束 条件3:当前帮忙扩容的线程数达到最大值 条件4:转移下标小于0,表示扩容完成 / if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; // 通过CAS的办法将sizeCtl变量的值+1,更新成功则实行数据转移 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { // 实行数据转移 transfer(tab, nextTab); break; } } // 返回扩容后的数组 return nextTab; } return table;}
get方法实现事理
ConcurrentHashMap通过ConcurrentHashMap#get方法获取key对应的value值。
ConcurrentHashMap#get方法实行流程,如图所示:
处理流程:
根据key从ConcurrentHashMap中获取value值,打算key对应的hash值,根据table数组及hash值对应的数组下标处是否存在节点进行判断:如果数组为空或者hash值对应的下标处不存在节点,则返回null。如果数组不为空且hash值对应的下标处存在节点:连续判断头节点的hash值以及key值是否与传入key的hash值以及key值相等:相等,则返转头节点的value值。头节点的hash值与传入key的hash值不相等且头节点的hash值小于0:头节点的hash值为-1:从nextTable数组(扩容后的数组)中查询,并返回查询结果。头节点的hash值为-2:从红黑树中查询,并返回查询结果。头节点的hash值与传入key的hash值不相等且头节点的hash值大于即是0,则遍历key的hash值对应的数组下标处的链表:存在与key值相等的节点,则返回节点的value值。不存在与key值相等的节点,则返回null。源码解析ConcurrentHashMap#get方法源码解析:
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; // 打算key的hash值 int h = spread(key.hashCode()); // table数组不为空且key的hash值对应的数组下标处存在节点 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 如果头节点的hash值与key的hash相等,则连续判断头节点的key值与传入的key值相等 if ((eh = e.hash) == h) { // 如果头节点的key值与传入key值相等,则返转头节点的value值 if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } / 如果key的hash值对应的数组下标处的头节点hash值小于0,解释正在进行数组扩容,则从nextTable数组(扩容后的数组)中查找key对应的value值 根据eh的值分两种情形: 1)-1:表示ForwardingNode,调用ForwardingNode中的find()方法 2)-2:表示TreeBin,调用TreeBin中的find()方法 / else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; // 如果key的hash值对应的数组下标处的头节点的key与key不相等,则遍历链表查找key对应的value值 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } // 没有找到key对应的节点,则返回null return null;}
扩容机制实现事理
当table数组中保存的数据越来越多,数组中的空位会越来越少,从而导致添加数据时hash冲突增加,终极导致数组中的链表越来越长,降落数据查询效率。
为了防止数组中的链表过长,提高数据查询效率,ConcurrentHashMap引入了扩容机制。扩容操作并不是在数组中没有空位时才进行,当数组中已利用的哈希槽位达到扩容因子设定的比例(默认:0.75)时,就会触发对数组进行扩容操作,扩容后的数组容量为table数组的2倍。
扩容操作通过ConcurrentHashMap#tryPresize方法实现,整体流程如图所示:
处理流程:
1)设置扩容后数组容量为table数组容量的2倍。2)判断table数组是否初始化:未初始化,则初始化table数组。已初始化,则判断table数组已扩容或数组大小达到最大值:是则不实行扩容操作。否则实行扩容操作。3)根据sizeCtl变量值进行判断:sizeCtl变量值小于0,解释其他线程正在进行数组扩容,则以CAS的办法将sizeCtl变量值+1,帮忙数组扩容。sizeCtl变量值大于即是0,解释当前数组为第一个参与数组扩容的线程。4)开始数据转移,根据table数据容量及CPU核数设置当前哨程卖力转移的哈希槽。并根据哈希槽中节点的hash值判断:hash值大于即是0,则转移链表中的数据。hash值小于0,则转移红黑树中的数据。5)根据节点hash值&数组扩容位的结果(二进制与操作的结果)进行判断:如果与操作结果为0,则数据转移后在新数组中的位置保持不变。如果与操作结果为1,则数据转移后在新数组中的位置保持为原来位置+table数组容量。6)设置哈希槽点头节点为ForwardingNode(表示正在进行数组扩容)。7)当前哨程卖力的哈希槽数据转移完成,判断table数组中的所有数据是否转移完成:否,则通过CAS的办法将sizeCtl变量的值-1,当前哨程是否为末了一个扩容线程:否,退出扩容操作。是,设置转移完成标志,下次循环实行table数组中的所有数据转移完成操作。是,则清空nextTable数组,设置table数组为扩容后的数组,设置sizeCtl的值为table数组容量的0.75倍。扩容时二进制与操作假设扩容前table数组容量为16,扩容后nextTable数组容量为32。
转移过程如图所示:
图中,table数组容量与nextTable数组容量的差异是二进制值中将第5位的值设置为1,即:将table数组容量向左移动一位得到nextTable数组容量。个中:
key1值的第5位为1,扩容后的值为扩容前的值+table数组容量(即:18)。key2值的第5位为0,扩容后的值保持不变(即:5)。源码解析ConcurrentHashMap#tryPresize方法源码解析:
private final void tryPresize(int size) { // 设置变量c为扩容后的数组容量为table数组的2倍,最大不超过MAXIMUM_CAPACITY值 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; // 如果sizeCtl的值大于即是0,则自旋 while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; // 如果table数组未初始化,则初始化table数组 if (tab == null || (n = tab.length) == 0) { // 设置n值为sizeCtl和变量c中的较大值 n = (sc > c) ? sc : c; // 以CAS的办法设置sizeCtl的值为-1,表示正在进行数组初始化 if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; // 设置变量sc的值为table数组容量的75% sc = n - (n >>> 2); } } finally { // 将变量sc的值赋值给sizeCtl变量(即:sizeCtl的值为table数组容量的75%) sizeCtl = sc; } } } // 如果数组已经扩容或者table数组的容量达到最大作,则退出自旋(即:不实行扩容操作) else if (c <= sc || n >= MAXIMUM_CAPACITY) break; // 实行扩容操作 else if (tab == table) { // 获取扩容标识 int rs = resizeStamp(n); // 如果sizeCtl的值小于0,解释正在进行数组扩容,则帮忙数组扩容 if (sc < 0) { Node<K,V>[] nt; / 知足以下四个条件,则退出自旋(即:不实行扩容操作) 条件1:当前哨程获取的扩容标识是否与通过sc打算出来的扩容标识同等,不一致则退出循环 条件2:默认第一个线程设置为sc==(rs<<16)+2,当第一个线程扩容结束sc的值-1,即:sc==(rs<<16)+1,表示扩容结束 条件3:当前帮忙扩容的线程数达到最大值 条件4:转移下标小于0,表示扩容完成 / if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // 通过CAS的办法将sizeCtl变量的值+1,更新成功则实行数据转移 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) // 实行数据转移 transfer(tab, nt); } // 此处表示当前哨程为第一个扩容线程,则nextTable数组传null(会在数据转移时新建一个容量为table数组2被的数组) else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // 实行数据转移,把稳:此处nextTable数组传null transfer(tab, null); } }}// 实行数据转移方法private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; / stride变量表示每个线程进行数据转移时卖力的哈希槽数 打算规则: 如果CPU核数大于1,则每个线程卖力的哈希槽数为table数组容量/8 如果CPU核数即是1,则该线程卖力全体table数组的所有哈希槽 每个卖力的哈希槽数最少为16个 / if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range // 如果传入的nextTable数组为空,则创建nextTable数组,容量为table数组的2倍 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; // 当前哨程进行数据转移的哈希槽区间 transferIndex = n; } int nextn = nextTab.length; // 创建ForwardingNode节点,并设置nextTable属性为nextTable数组的引用 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 标识某个哈希槽中的数据是否完成转移,true-表示转移完成,开启下一个哈希槽转移 boolean advance = true; // 标识table数组中的数据是否完成转移,true-表示转移完成 boolean finishing = false; // to ensure sweep before committing nextTab // 开始转移某个哈希槽中的数据,i-表示当前须要转移的哈希槽,bound-表示须要转移的哈希槽边界 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; / 根据哈希槽的转移情形设置变量i和bound i:transferIndex-1 bound:transferIndex-stride / while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } // 以CAS的办法设置transferIndex = transferIndex - stride else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } // 当前哨程卖力的哈希槽数据转移完成 if (i < 0 || i >= n || i + n >= nextn) { int sc; // table数组的所有数据转移完成 if (finishing) { // 清空nextTable数组 nextTable = null; // 设置table数组为扩容后的数组 table = nextTab; // 设置sizeCtl的值为table数组容量的0.75倍 sizeCtl = (n << 1) - (n >>> 1); return; } // 通过CAS的办法将sizeCtl变量的值-1 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 如果当前哨程不是本次实行扩容操作的末了一个转移线程,则退出 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } // 如果当前哈希槽中不存在节点,则设置哈希槽的头节点为ForwardingNode节点 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // 如果当前哈希槽中的数据已经转移完成,则设置advance为true else if ((fh = f.hash) == MOVED) advance = true; // already processed // 当前哈希槽中的数据未转移完成,则连续转移数据 else { // 锁定当前哈希槽 synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; // 节点的hash值大于0,解释转移的事链表中的数据 if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; // 根据节点的哈希值与n的结果是否为0将链表拆分为两个子链表 if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // (ph & n)==0,将链表中的节点设置到ln链表中,ln链表保存转移后在新数组中位置保持不变的节点 setTabAt(nextTab, i, ln); // (ph & n)!=0,将链表中的节点设置到hn链表中,hn链表保存转移后在新数组中位置为i+n的节点 setTabAt(nextTab, i + n, hn); // 设置哈希槽点头节点为ForwardingNode setTabAt(tab, i, fwd); advance = true; } // 转移红黑树中的数据 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } }}
常见问题
为什么数组大小必须为2的n次方?
将数组设置为2的n次方紧张为了减少哈希碰撞的概率。
假设向ConcurrentHashMap中添加key为16、17、18三个key-value键值对,16、17、18三个key的二进制值为10000、10001、10010:
如果数组大小不是2的n次方,如:数组大小为21,那么数组大小-1为20,其二进制值为10100,分别与16、17、18的二进制值进行&运算后的结果都是10000。如果数组大小是2的n次方,如:数组大小为2^4,那么数组大小-1为15,其二进制值为1111,分别与16、17、18的二进制值进行&运算后的结果为0000、0001、0010。通过比拟,将数组设置为2的n次方显然减少了哈希碰撞的概率。
为什么查询数组下标时利用按位与(&)操作?
利用按位与操作的缘故原由是按位与操作更高效。如:数组扩容时通过按位与进行数据迁移。
利用示例/ @author 南秋同学 ConcurrentHashMap利用示例 /@Slf4jpublic class ConcurrentHashMapExample { private static ConcurrentHashMap<String, AtomicInteger> count = new ConcurrentHashMap(); private static CountDownLatch countDownLatch = new CountDownLatch(3); public static void main(String[] args) { Runnable task = new Runnable() { @Override public void run() { AtomicInteger old; for (int i = 0; i < 10; i++) { old = count.get("南秋同学"); if (null == old) { AtomicInteger zero = new AtomicInteger(0); old = count.putIfAbsent("南秋同学", zero); if (null == old) { old = zero; } } old.incrementAndGet(); } countDownLatch.countDown(); } }; new Thread(task).start(); new Thread(task).start(); new Thread(task).start(); try { countDownLatch.await(); log.info("count:{}",count); } catch (Exception e) { e.printStackTrace(); } }}