内容简介:在桶列表Table默认初始大小为n=16,最大为n=2^31,负载阈值为0.75*n,当桶中普通链表的元素数量超过8个就会转成红黑二叉树,当桶中红黑树的元素减少到6个就会转成普通的单链表形式。在扩容的过程中,每个线程转移数据的索引数量步伐为对于table大小为n的表格,其散列计算方法为
# ConcurrentHashMap的源码完全分析
一. 概述
在 ConcurrentHashMap
内部实现中,一个有table列表,列表中的元素指向一个桶( bin
),该桶的元素头有以下三种:
-
普通链表节点:通常是桶中元素小于8个,就是一个单链表,头元素
hash > 0
。 -
转移节点(
MOVED
): 表明当前正长扩容中,当前的节点元素已经被转移到新table中,头元素hash = -1
。 -
树节点(
TREEBIN
): 表示当前的桶是一个红黑二叉树桶,头元素hash = -2
。 -
占位节点(
RESERVED
):一般用于当key对应的值缺失需要计算的场景,在计算出新值之前临时占坑位用的,计算出来之后就用普通Node节点替换掉,头元素hash = -3
。
桶列表Table默认初始大小为n=16,最大为n=2^31,负载阈值为0.75*n,当桶中普通链表的元素数量超过8个就会转成红黑二叉树,当桶中红黑树的元素减少到6个就会转成普通的单链表形式。在扩容的过程中,每个线程转移数据的索引数量步伐为 Max(NCPU > 1 ? (n >>> 3) / NCPU : n, 16)
,最小值为16。
二. 源码分析
2.1 hash的散列计算方法
对于table大小为n的表格,其散列计算方法为 ((hash^(hash >>> 16))&0X7FFFFFFF) & n
,其中n为2的幂值( n = 2^x
),源码如下:
// http://www.easysb.cn/?p=325&preview=true // Jekkay Hu static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash ... int h = spread(key.hashCode()); ... static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
2.2 构造函数
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; }
其中 sizeCtl
的用作表初始化和扩容控制,具体可以参考注释,下面代码也有解释。
/** * Table initialization and resizing control. When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads). Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. */ private transient volatile int sizeCtl;
2.3 有意思的函数
-
函数
tableSizeFor
,功能是向上取2的幂的函数,如下
/** * Returns a power of two table size for the given desired capacity. * See Hackers Delight, sec 3.2 */ private static final int tableSizeFor(int c) { int n = c - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; }
-
根据索引获取表对应桶的几个主要方法,使用
sun.misc.Unsafe
的方法:
// 功能: return tab[i] static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } // 功能: if(tab[i] == c) tab[i] = v static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } // 功能: tab[i] = v static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }
2.4 元素插入
元素的插入较为复杂,可以直接看源代码的中文注释。
// 插入数据,putIfAbsent表示只有缺失时才插入,否则强制更新 /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { // 不允许空值 if (key == null || value == null) throw new NullPointerException(); // 计算hash值 int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) // 初始化表格 tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // tabAt原子操作f= tab[i],找到表格中对应的桶,i为hash投影的索引 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) // casTabAt原子操作,主要是tab[i]= new Node(...),如果失败则表示 // 其他线程已经设置,就for中可以直接获取 break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) // 假如桶的头节点为ForwardingNode tab = helpTransfer(tab, f); else { V oldVal = null; // 普通的节点,将头锁住方便更新 synchronized (f) { // 再次检测下,如果不等说明被其他线程更新,等待一次重新操作 if (tabAt(tab, i) == f) { if (fh >= 0) { // 如果是正常节点,直接往后查询 // binCount表示当前桶中节点的数量 binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; // 如果已经找到,则根据onlyIfAbsent是否更新 if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; // 假如没找到则追加到链尾 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 当前的桶是一个红黑二叉树的头结点 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; // binCount设置为2,防止转化成二叉树 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 假如当前桶的节点数量大于8个,那就该桶的数据转成红黑二叉树形式 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 元素个数增加1,binCount表示当前插入数据所在桶元素序号(1,2...),根据条件决定是否扩容 addCount(1L, binCount); return null; }
初始化表格逻辑逻辑:
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) // 此时正在扩容,或者初始化,需要让出线程 Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 使用unsafe原子操作,假如等于sc,则将sizeCtl设置为-1,表示正在初始化 // compareAndSwapInt可以避免多个线程同时对表格初始化,如果有其他线程 // 已经重置为-1,那么当前的线程就不需要对表格进行初始化操作。 try { // 二次确认当前还没被初始化 if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; // 全局变量LOAD_FACTOR = 0.75f 表示 负载阈值 // 计算负载阈值sc = n * 0.75,采用以下移位操作更快 sc = n - (n >>> 2); } } finally { // 初始化之后,更新为负载阈值,超过会重新扩容 sizeCtl = sc; } break; } } return tab; }
addCount
累加表的数据量,check表示当前添加元素所在桶的序列(1,2…),并判断是否需要扩容。sizeCtl为正数,表示容器的负载阈值。当开始扩容时,sizeCtl首次会赋值为负数 sizeCtl=((Integer.numberOfLeadingZeros(n) | (1 << 15)) << 16) + 2
,如果其线程再次进入时会将sizeCtl加1
private final void addCount(long x, int check) { CounterCell[] as; long b, s; // 总数baseCount+1 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // 多个线程同时进入,sizeCtrl + 1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // 将sizeCtrl设置负数,然后开始转移 transfer(tab, null); s = sumCount(); } } }
transfer
函数将原来表中的元素转移到新表中,旧表i桶中的元素要不转移到新表低位i桶或者高位(i+n)桶。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; // 计算转移的步伐 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating // 第一个线程,首次进来初始化 try { // 容量扩大1倍 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } // 保存新表在成员变量中 nextTable = nextTab; // 保存下一张表大小,方便转移 transferIndex = n; } int nextn = nextTab.length; // 创建一个转移节点 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; // 用来标记是否完成,方便提交更改 boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; // 先获取锁定一下转移数据表的索引范围值bound ~ i while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) // 不停往前移动表索引 advance = false; else if ((nextIndex = transferIndex) <= 0) { // 如果是<= 0,表明转移全部完成 i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { // transferIndex减去步伐stride,下个线程就从新的索引处nextBound转移数据 bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { // 将新表替换旧表 nextTable = null; table = nextTab; // 更新负载阈值为 n * 0.75 sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 当前的线程转移完成 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // 判断是否还有其他的线程在转移 return; // 转移完成 finishing = advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) // 如原来的表没有数据,则直接设置一个转移节点到旧表中 advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) // 有可能其他的线程已经处理过 advance = true; // already processed else { // 锁定表中对应桶,然后进行数据转移,元素要不在在原来i对应的低位桶,要不就在高位(i+n)对应桶 synchronized (f) { if (tabAt(tab, i) == f) { // 二次确认,防止别其他线程抢 Node<K,V> ln, hn; if (fh >= 0) { // hash值大于0,普通的链表 int runBit = fh & n; Node<K,V> lastRun = f; // 找到最后一个转以后不在当前索引对应桶中,而是在新扩对应(i+n)索引对应桶中 // 因为元素散列算法就是 (hash^(hash>>>16)&0x7FFFFFFF) & (2^x -1),扩容一次 // x的值就增加1,表大小容量是2倍增(n=2^x),所以当hash&n=hash&2^x的值不同时, // 则说明该元素在新的表中会散列到高位索引(i+n)中 for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { // 如果没有找到,则表明该桶中的所有元素在新表对应的位置不变 ln = lastRun; hn = null; } else { // 如果找到了,那么说明该桶中的元素需要重新散列 hn = lastRun; ln = null; } // 重新建立新链表 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // 设置新表低位、高位的头,并将原来的节点设置为转移节点 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { // 二叉树 TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; // 将二叉树的节点分成低位元素和高位元素链表 for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { // 低位元素 if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { // 高位元素 if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 如果元素链表数量小于6,则退化成链表,否则就生产二叉树 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; // 设置新表的头,并且将旧表设置为转移节点 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
至此,map的插入和扩容主要逻辑都已经非常清晰了,而二叉树的插入和删除细节较多,但是功能相对来说非常简单。 putTreeVal
表示查找二叉树上对应的节点,如果没找到则自动生成一个节点,具体的红黑二叉树的旋转之类的就不再赘述了,网上太多资料。
final TreeNode<K,V> putTreeVal(int h, K k, V v) { Class<?> kc = null; boolean searched = false; for (TreeNode<K,V> p = root;;) { int dir, ph; K pk; if (p == null) { // 当前桶为空,则直接创建一个新的节点 first = root = new TreeNode<K,V>(h, k, v, null, null); break; } else if ((ph = p.hash) > h) // hash值大于当前key的,在左子树 dir = -1; else if (ph < h) // hash值小于当前key的,在右子树 dir = 1; else if ((pk = p.key) == k || (pk != null && k.equals(pk))) // 等于,则比较key是否相等 return p; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) { // 如果hash值相等,那么就比较下是否是String或者实现了Comparable接口 // 如果等出现二次碰撞,则左右子树都搜索 if (!searched) { TreeNode<K,V> q, ch; searched = true; // 找到对应的节点 if (((ch = p.left) != null && (q = ch.findTreeNode(h, k, kc)) != null) || ((ch = p.right) != null && (q = ch.findTreeNode(h, k, kc)) != null)) // 找到直接返回即可 return q; } // 假如没有找到,则根据类名、System.identityHashCode比较来确定到底放在左右哪边 dir = tieBreakOrder(k, pk); } TreeNode<K,V> xp = p; // 根据比较的大小dir来确定插入左子树还是右子树 if ((p = (dir <= 0) ? p.left : p.right) == null) { TreeNode<K,V> x, f = first; first = x = new TreeNode<K,V>(h, k, v, f, xp); if (f != null) f.prev = x; if (dir <= 0) // <=0 左子树 xp.left = x; else // >0 右子树 xp.right = x; if (!xp.red) x.red = true; else { // 加锁,调整平衡 lockRoot(); try { root = balanceInsertion(root, x); } finally { unlockRoot(); } } break; } } assert checkInvariants(root); return null; } // 多次冲突碰撞之后,就用此来决定谁大谁小,以区分在左还是在右子树上 static int tieBreakOrder(Object a, Object b) { int d; if (a == null || b == null || (d = a.getClass().getName(). compareTo(b.getClass().getName())) == 0) d = (System.identityHashCode(a) <= System.identityHashCode(b) ? -1 : 1); return d; }
再回头开始的,假如对于MOVE节点( forwarding nodes
),也是说当前有其他的线程正在扩容,整体尚未完成,但是当前的节点已经完成了转移,那此时的处理代码为:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; // 多个线程同时进入,sizeCtrl + 1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
2.5 查找
查找元素更为简单,找到列表中对应的桶,根据不同桶的类型,调用的不同的方法找对应元素。
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; // 计算对应的hash值 int h = spread(key.hashCode()); // 找到对应的数组对应的桶,数组n的必须2的次方(n = 2^i, 其中i<=30) // (n - 1) & h) 表示将hash值投影在0 ~ n-1之间的索引值 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { // 假如hash值相等,key也相等,那么表示找到 if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) // 如果hash值小于0,说明是特殊节点ForwardingNode,ReservationNode return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { // eh>=0,为普通节点,继续搜索链表后面的节点列表 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
以上所述就是小编给大家介绍的《ConcurrentHashMap的源码完全分析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Code Complete
Steve McConnell / Microsoft Press / 2004-6-19 / GBP 40.99
在线阅读本书 Widely considered one of the best practical guides to programming, Steve McConnells original CODE COMPLETE has been helping developers write better software for more than a decade. Now......一起来看看 《Code Complete》 这本书的介绍吧!