ConcurrentHashMap的源码完全分析

栏目: 数据库 · 发布时间: 6年前

内容简介:在桶列表Table默认初始大小为n=16,最大为n=2^31,负载阈值为0.75*n,当桶中普通链表的元素数量超过8个就会转成红黑二叉树,当桶中红黑树的元素减少到6个就会转成普通的单链表形式。在扩容的过程中,每个线程转移数据的索引数量步伐为对于table大小为n的表格,其散列计算方法为

# ConcurrentHashMap的源码完全分析

一. 概述

ConcurrentHashMap 内部实现中,一个有table列表,列表中的元素指向一个桶( bin ),该桶的元素头有以下三种:

  • 普通链表节点:通常是桶中元素小于8个,就是一个单链表,头元素 hash > 0
  • 转移节点( MOVED ): 表明当前正长扩容中,当前的节点元素已经被转移到新table中,头元素 hash = -1
  • 树节点( TREEBIN ): 表示当前的桶是一个红黑二叉树桶,头元素 hash = -2
  • 占位节点( RESERVED ):一般用于当key对应的值缺失需要计算的场景,在计算出新值之前临时占坑位用的,计算出来之后就用普通Node节点替换掉,头元素 hash = -3

桶列表Table默认初始大小为n=16,最大为n=2^31,负载阈值为0.75*n,当桶中普通链表的元素数量超过8个就会转成红黑二叉树,当桶中红黑树的元素减少到6个就会转成普通的单链表形式。在扩容的过程中,每个线程转移数据的索引数量步伐为 Max(NCPU > 1 ? (n >>> 3) / NCPU : n, 16) ,最小值为16。

二. 源码分析

2.1 hash的散列计算方法

对于table大小为n的表格,其散列计算方法为 ((hash^(hash >>> 16))&0X7FFFFFFF) & n ,其中n为2的幂值( n = 2^x ),源码如下:

// http://www.easysb.cn/?p=325&preview=true
// Jekkay Hu

static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

...
int h = spread(key.hashCode());
...

static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

2.2 构造函数

public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

其中 sizeCtl 的用作表初始化和扩容控制,具体可以参考注释,下面代码也有解释。

/**
     * Table initialization and resizing control.  When negative, the
     * table is being initialized or resized: -1 for initialization,
     * else -(1 + the number of active resizing threads).  Otherwise,
     * when table is null, holds the initial table size to use upon
     * creation, or 0 for default. After initialization, holds the
     * next element count value upon which to resize the table.
     */
    private transient volatile int sizeCtl;

2.3 有意思的函数

  • 函数 tableSizeFor ,功能是向上取2的幂的函数,如下
/**
     * Returns a power of two table size for the given desired capacity.
     * See Hackers Delight, sec 3.2
     */
    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }
  • 根据索引获取表对应桶的几个主要方法,使用 sun.misc.Unsafe 的方法:

// 功能: return tab[i]
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

// 功能: if(tab[i] == c) tab[i] = v
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
        Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

// 功能: tab[i] = v
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

2.4 元素插入

元素的插入较为复杂,可以直接看源代码的中文注释。

// 插入数据,putIfAbsent表示只有缺失时才插入,否则强制更新
    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        // 不允许空值
        if (key == null || value == null) throw new NullPointerException();
        // 计算hash值
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                // 初始化表格
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
      // tabAt原子操作f= tab[i],找到表格中对应的桶,i为hash投影的索引
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                // casTabAt原子操作,主要是tab[i]= new Node(...),如果失败则表示
                // 其他线程已经设置,就for中可以直接获取
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                // 假如桶的头节点为ForwardingNode
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                // 普通的节点,将头锁住方便更新
                synchronized (f) {
                // 再次检测下,如果不等说明被其他线程更新,等待一次重新操作
                    if (tabAt(tab, i) == f) { 
                        if (fh >= 0) {
                        // 如果是正常节点,直接往后查询
                        // binCount表示当前桶中节点的数量
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    // 如果已经找到,则根据onlyIfAbsent是否更新
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                // 假如没找到则追加到链尾
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 当前的桶是一个红黑二叉树的头结点
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2; //  binCount设置为2,防止转化成二叉树
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    // 假如当前桶的节点数量大于8个,那就该桶的数据转成红黑二叉树形式
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 元素个数增加1,binCount表示当前插入数据所在桶元素序号(1,2...),根据条件决定是否扩容
        addCount(1L, binCount);
        return null;
    }

初始化表格逻辑逻辑:

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0) // 此时正在扩容,或者初始化,需要让出线程
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
        // 使用unsafe原子操作,假如等于sc,则将sizeCtl设置为-1,表示正在初始化
        // compareAndSwapInt可以避免多个线程同时对表格初始化,如果有其他线程
        // 已经重置为-1,那么当前的线程就不需要对表格进行初始化操作。
                try {
                    // 二次确认当前还没被初始化
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        // 全局变量LOAD_FACTOR = 0.75f 表示 负载阈值
                        // 计算负载阈值sc = n * 0.75,采用以下移位操作更快
                        sc = n - (n >>> 2); 
                    }
                } finally {
                    // 初始化之后,更新为负载阈值,超过会重新扩容
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

addCount 累加表的数据量,check表示当前添加元素所在桶的序列(1,2…),并判断是否需要扩容。sizeCtl为正数,表示容器的负载阈值。当开始扩容时,sizeCtl首次会赋值为负数 sizeCtl=((Integer.numberOfLeadingZeros(n) | (1 << 15)) << 16) + 2 ,如果其线程再次进入时会将sizeCtl加1

private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        // 总数baseCount+1
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    // 多个线程同时进入,sizeCtrl + 1
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
              // 将sizeCtrl设置负数,然后开始转移
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

transfer 函数将原来表中的元素转移到新表中,旧表i桶中的元素要不转移到新表低位i桶或者高位(i+n)桶。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        // 计算转移的步伐
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
        // 第一个线程,首次进来初始化
            try {
                // 容量扩大1倍
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            // 保存新表在成员变量中
            nextTable = nextTab;
            // 保存下一张表大小,方便转移
            transferIndex = n;
        }
        int nextn = nextTab.length;
        // 创建一个转移节点
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        // 用来标记是否完成,方便提交更改
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            // 先获取锁定一下转移数据表的索引范围值bound ~ i
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing) // 不停往前移动表索引
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    // 如果是<= 0,表明转移全部完成
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                // transferIndex减去步伐stride,下个线程就从新的索引处nextBound转移数据
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    // 将新表替换旧表
                    nextTable = null;
                    table = nextTab;
                    // 更新负载阈值为 n * 0.75
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    // 当前的线程转移完成
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) 
                    // 判断是否还有其他的线程在转移
                        return;
                    // 转移完成
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)
                // 如原来的表没有数据,则直接设置一个转移节点到旧表中
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                // 有可能其他的线程已经处理过
                advance = true; // already processed
            else {
                // 锁定表中对应桶,然后进行数据转移,元素要不在在原来i对应的低位桶,要不就在高位(i+n)对应桶
                synchronized (f) {
                    if (tabAt(tab, i) == f) { // 二次确认,防止别其他线程抢
                        Node<K,V> ln, hn;
                        if (fh >= 0) { // hash值大于0,普通的链表
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            // 找到最后一个转以后不在当前索引对应桶中,而是在新扩对应(i+n)索引对应桶中
                            // 因为元素散列算法就是 (hash^(hash>>>16)&0x7FFFFFFF) & (2^x -1),扩容一次
                            // x的值就增加1,表大小容量是2倍增(n=2^x),所以当hash&n=hash&2^x的值不同时,
                            // 则说明该元素在新的表中会散列到高位索引(i+n)中
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                           // 如果没有找到,则表明该桶中的所有元素在新表对应的位置不变
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                            // 如果找到了,那么说明该桶中的元素需要重新散列
                                hn = lastRun;
                                ln = null;
                            }
                            // 重新建立新链表
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            // 设置新表低位、高位的头,并将原来的节点设置为转移节点
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {
                           // 二叉树
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            // 将二叉树的节点分成低位元素和高位元素链表
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) { // 低位元素
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else { // 高位元素
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            // 如果元素链表数量小于6,则退化成链表,否则就生产二叉树
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            // 设置新表的头,并且将旧表设置为转移节点
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

至此,map的插入和扩容主要逻辑都已经非常清晰了,而二叉树的插入和删除细节较多,但是功能相对来说非常简单。 putTreeVal 表示查找二叉树上对应的节点,如果没找到则自动生成一个节点,具体的红黑二叉树的旋转之类的就不再赘述了,网上太多资料。

final TreeNode<K,V> putTreeVal(int h, K k, V v) {
  Class<?> kc = null;
  boolean searched = false;
  for (TreeNode<K,V> p = root;;) {
      int dir, ph; K pk;
      if (p == null) {
          // 当前桶为空,则直接创建一个新的节点
          first = root = new TreeNode<K,V>(h, k, v, null, null);
          break;
      }
      else if ((ph = p.hash) > h)
          // hash值大于当前key的,在左子树
          dir = -1;
      else if (ph < h)
          // hash值小于当前key的,在右子树
          dir = 1;
      else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
          // 等于,则比较key是否相等
          return p;
      else if ((kc == null &&
                (kc = comparableClassFor(k)) == null) ||
               (dir = compareComparables(kc, k, pk)) == 0) {
        // 如果hash值相等,那么就比较下是否是String或者实现了Comparable接口
        // 如果等出现二次碰撞,则左右子树都搜索
          if (!searched) {
              TreeNode<K,V> q, ch;
              searched = true;
              // 找到对应的节点
              if (((ch = p.left) != null &&
                   (q = ch.findTreeNode(h, k, kc)) != null) ||
                  ((ch = p.right) != null &&
                   (q = ch.findTreeNode(h, k, kc)) != null))
                  // 找到直接返回即可
                  return q;
          }
          // 假如没有找到,则根据类名、System.identityHashCode比较来确定到底放在左右哪边
          dir = tieBreakOrder(k, pk);
      }

      TreeNode<K,V> xp = p;
      // 根据比较的大小dir来确定插入左子树还是右子树
      if ((p = (dir <= 0) ? p.left : p.right) == null) {
          TreeNode<K,V> x, f = first;
          first = x = new TreeNode<K,V>(h, k, v, f, xp);
          if (f != null)
              f.prev = x;
          if (dir <= 0) // <=0 左子树
              xp.left = x;
          else // >0 右子树
              xp.right = x;
          if (!xp.red)
              x.red = true;
          else {
              // 加锁,调整平衡
              lockRoot();
              try {
                  root = balanceInsertion(root, x);
              } finally {
                  unlockRoot();
              }
          }
          break;
      }
  }
  assert checkInvariants(root);
  return null;
}

// 多次冲突碰撞之后,就用此来决定谁大谁小,以区分在左还是在右子树上
static int tieBreakOrder(Object a, Object b) {
  int d;
  if (a == null || b == null ||
      (d = a.getClass().getName().
       compareTo(b.getClass().getName())) == 0)
      d = (System.identityHashCode(a) <= System.identityHashCode(b) ?
           -1 : 1);
  return d;
}

再回头开始的,假如对于MOVE节点( forwarding nodes ),也是说当前有其他的线程正在扩容,整体尚未完成,但是当前的节点已经完成了转移,那此时的处理代码为:

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
  Node<K,V>[] nextTab; int sc;
  if (tab != null && (f instanceof ForwardingNode) &&
      (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
      int rs = resizeStamp(tab.length);
      while (nextTab == nextTable && table == tab &&
             (sc = sizeCtl) < 0) {
          if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
              sc == rs + MAX_RESIZERS || transferIndex <= 0)
              break;
          // 多个线程同时进入,sizeCtrl + 1
          if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
              transfer(tab, nextTab);
              break;
          }
      }
      return nextTab;
  }
  return table;
}

2.5 查找

查找元素更为简单,找到列表中对应的桶,根据不同桶的类型,调用的不同的方法找对应元素。

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        // 计算对应的hash值
        int h = spread(key.hashCode());
        // 找到对应的数组对应的桶,数组n的必须2的次方(n = 2^i, 其中i<=30)
        // (n - 1) & h) 表示将hash值投影在0 ~ n-1之间的索引值
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                // 假如hash值相等,key也相等,那么表示找到
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                // 如果hash值小于0,说明是特殊节点ForwardingNode,ReservationNode
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                // eh>=0,为普通节点,继续搜索链表后面的节点列表
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

以上所述就是小编给大家介绍的《ConcurrentHashMap的源码完全分析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Black Box Society

The Black Box Society

Frank Pasquale / Harvard University Press / 2015-1-5 / USD 35.00

Every day, corporations are connecting the dots about our personal behavior—silently scrutinizing clues left behind by our work habits and Internet use. The data compiled and portraits created are inc......一起来看看 《The Black Box Society》 这本书的介绍吧!

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具