从源码分析ConcurrentHashMap

栏目: 数据库 · 发布时间: 5年前

内容简介:作为线程安全的HashMap,Java提供了Hashtable和ConcurrentHashMap两种实现,而Hashtable控制线程安全的方式仅仅是用synchronized对方法加锁,这种低效且过时的方法已经不适合如今的开发在JDK5中,就已经出现了ConcurrentHashMap作为Hashtable的高效替代品。在JDK7及之前,ConcurrentHashMap还是使用ConcurrentHashMap中诸如最大长度、负载因子等属性和HashMap中一致,不多赘述,见之前的这篇博客:从源码分析

作为线程安全的HashMap,Java提供了Hashtable和ConcurrentHashMap两种实现,而Hashtable控制线程安全的方式仅仅是用synchronized对方法加锁,这种低效且过时的方法已经不适合如今的开发

在JDK5中,就已经出现了ConcurrentHashMap作为Hashtable的高效替代品。在JDK7及之前,ConcurrentHashMap还是使用 分段锁 的技术来提高效率,而在JDK8中,则大量采用CAS方式来保证并发安全性,接下来就会讲到ConcurrentHashMap是如何高效地解决并发冲突问题

属性结构分析

ConcurrentHashMap中诸如最大长度、负载因子等属性和HashMap中一致,不多赘述,见之前的这篇博客:从源码分析HashMap

这里我们只看重点

private static int RESIZE_STAMP_BITS = 16;
    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
复制代码

上面这3个属性在HashMap中没有出现,我们注意到这些属性中都有一个RESIZE,看来是和扩容有关的,至于怎么操作,我们一会儿说到的时候再看

// 保存键值对数据的数组
	transient volatile Node<K,V>[] table;	
	// 扩容时的辅助数组
	private transient volatile Node<K,V>[] nextTable;
复制代码

上面这两个属性很好理解,我们要注意的就是前面都加上了volatile关键字,以保证并发时的可见性

// 元素个数
	private transient volatile long baseCount;
	// Map中table的状态标识
	private transient volatile int sizeCtl;
	// 扩容时的分界位置
	private transient volatile int transferIndex;
	// 额外待统计的元素个数
	private transient volatile CounterCell[] counterCells;
复制代码

后面两个属性是不是搞不懂用处是什么,没关系一会儿就会讲到,我们先看前两个属性

第一个属性baseCount显而易见就是指元素的个数,如果我们看源码中的注释,会发现这里的baseCount实际指 当没有发生线程争用时的元素个数 ,同时还作为初始化的后备属性

第二个属性sizeCtl有以下的几种情况

  • 为正数 -- 如果未初始化,表示table需要初始化的大小 -- 如果已初始化,表示table的容量(总大小的0.75倍)
  • 为负数 -- 值为-1时,表示正在初始化 -- 值为-N时,表示有N-1个线程正在初始化

然后我们来看最常用的两个构造函数,如下

public ConcurrentHashMap() {
    }
    
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }
复制代码

正如我们所说,sizeCtl在未初始化table时,等于table需要初始化的大小,顺便说一下,ConcurrentHashMap和HashMap一样,都将第一次初始化延迟到了第一次put操作,这么做避免了无谓的初始化操作

辅助方法

initTable() —— 初始化table

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        // 只要table是空的就一直进行初始化操作
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)	// 如果table已经处在初始化过程中,就让当前线程让出cpu
                Thread.yield(); 
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            	// 通过CAS操作对sizeCtl赋值-1(表示table正在初始化)
                try {
                	// 再次进行判断table是否为空(双检锁)
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
复制代码

配合注释可以很容易理解,在初始化方法中,通过两次检查table是否为空以及CAS赋值操作保证了多线程下的安全性

tabAt()和casTabAt() —— 获取/设置table索引位置值

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
复制代码

这里都调用了Unsafe对象提供的native方法,也许有人会想为什么不用table[index]来获值/赋值,要注意,ConcurrentHashMap是为了应对多线程并发的情况而存在的,在多线程下,我们并不能保证table[index]一定能获得即时的属性值,如果是修改操作则会发生修改覆盖的情况

transfer() —— 数组扩容

这个方法代码相当长,希望配合注释耐心看完

/**
     * @param tab 扩容前的数组(当前数组)
     * @param nextTab 扩容后的数组
     */
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        // NCPU为虚拟机可用的处理器数量,stride表示每个处理器需要处理的区间个数,最少为16
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE;
        // 如果辅助数组为空(这个数组仅在扩容时不为空)
        if (nextTab == null) {
            try {
            	// 默认扩容两倍
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            // 转移下标(下一次处理的数组段分界位置)
            transferIndex = n;
        }
        int nextn = nextTab.length;
        // ForwardingNode是Node的子类, 其中包含一个Node类型的数组
        // 做占位使用,可以让别的线程检查是否有其他线程在使用数组
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        // 判断扩容是否完成
        boolean finishing = false;
        // bound标记当前线程处理的区间段的最小下标
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                // 转移下标小于等于0,说明所有的区间段都处理完毕
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                // CAS操作赋值失败后advance值不变,会再次循环
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                 	// bound为当前区段的最小下标
                    bound = nextBound;
                    // i为当前区段的最大下标
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            // n为原数组长度,nextn为新数组的长度
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                // 如果扩容完成,就把nextTab赋值给table,然后结束
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                // 尝试将当前扩容的线程数+1(不懂为什么的,翻到上面看sizeCtl的含义)
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                	// 如果没有线程在帮助扩容,说明扩容结束
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            // 如果i索引处为空,就用一个fwd来进行占位
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            // 如果已经被别的线程处理过了
            else if ((fh = f.hash) == MOVED)
                advance = true;
            else {
            	// 给当前索引位置节点加锁
                synchronized (f) {
                	// 再次检查一遍索引位置的值是否改变
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        // 如果索引节点的hash值大于等于0(红黑树节点的hash值为-2)
                        if (fh >= 0) {
                        	// 获取在新数组上的hash索引值
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            // 遍历节点链表,更新其hash值
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                	// runBit记录最后一个拥有不同的新hash值的节点
                                    runBit = b;
                                    // lastRun和其之后的节点拥有和p相同的新hash值
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            // 对于新hash不同的节点,根据取余结果分为ln和hn两条链表
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            // 将低位链表放到i索引处
                            setTabAt(nextTab, i, ln);
                            // 将高位链表放到i+n索引处
                            setTabAt(nextTab, i + n, hn);
                            // 在旧链表处设置占位符,表示该索引节点已经被处理过
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        // 如果f是树节点
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }
复制代码

算法很长,如果看注释看不懂的话,我在这里再解释一下

将整个部分分为三块:

  • 预处理部分(计算需要处理的区间个数,设置新数组容量等)
  • 分段部分(计算每个线程需要处理的区间段)
  • 处理部分(将旧数组的节点转移到新数组的对应位置上)

在ConcurrentHashMap的扩容方法中,核心代码的最外层是一个循环,每次分配数组的一段给线程,然后通过设置 占位符 的方法,变相地告知其他线程某个索引位置正在处理,这样就保证了并发安全性。在处理时,通过将索引位置链表上的节点按hash取余结果分为 低位高位 两种,高位节点放在新数组的高位处,低位节点则放在原数组的对应位置,最终完成了数组的转移

addCount() —— 增加元素个数

这个方法并不是像方法表面意义那样像table中添加元素,在调用这个方法的时候我们的元素早就已经添加到table中了。实际上,这个方式是在添加元素之后,增加 元素个数统计值 ,同时承担了判断是否扩容的职责,如下

/**
     * @param x 要增加的元素个数
     * @param check 当小于0时,不检查是否进行扩容,
     * 当小于等于1时,只在非竞争状态下检查是否需要扩容
     */
    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        // 当counterCells为空,且直接增加baseCount的值成功,就跳过这一步
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            // 判断是否发生竞争的标志量
            boolean uncontended = true;
            // 当发生以下几种情况之一时,需要进一步操作
            // 1. counterCells为空
            // 2. counterCells中取一个随机位置:
            // 		- 这个位置的值为空
            // 		- 或者给这个位置的值通过CAS操作加上【要增加的元素个数值】失败
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                // 相当于加强版的addCount,里面通过死循环来进行赋值
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            // s为当前元素总数
            s = sumCount();
        }
        // 检查是否需要扩容
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            // 如果元素总数超过容量,或数组为空,就进行扩容
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                // 说明正在进行扩容
                if (sc < 0) {
                	// 发生扩容结束、线程上限或者属性被修改等异常,结束循环
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    // 尝试给扩容线程数+1,表示增加了一个线程帮助扩容
                    // 这里我不是很理解为什么+1,sc小于0时的+1操作不是表示减少一个扩容线程吗
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                // 如果不是负数,就左移16位然后+2,变成一个负数,高16位是标识符,低16位是2
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }
复制代码

这个方法相对而言就好理解多了,分为以下几步:

  1. 检查计数盒是否为空,如果不为空,转到第3步
  2. addCount方法会尝试修改baseCount,如果不成功,就进入下一步
  3. 给计数盒上的随机索引位置处,加上需要增加的个数,如果失败,就循环重试
  4. 如果需要检查扩容,就调用transfer方法进行扩容

sumCount() —— 统计元素个数

这里的统计元素方法并不是简单地返回baseCount或者遍历table计算,如下

final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
复制代码

我们会发现元素的总数相当于 baseCount + counterCells数组非空元素值之和 ,现在我们就能知道couterCells是什么意思了。在addCount方法中,我们发现本来应加在baseCount上的值,有一部分加在了counterCells数组中,也就是说元素总数应该为baseCount和couterCells数组的所有值之和

常用方法

get()

方法和HashMap有很大类似,我们直接看如下代码

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        // 如果输入不为空且索引节点也不为空
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            // 说明是树节点
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            // 通过循环从链表中找
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
复制代码

了解过HashMap的get函数之后,这里的代码就不难理解了。为了保证并发安全性,通过**tabAt()**方法获取某索引节点的值

整段代码的流程和HashMap中的get函数基本一致:

  • 检查数组是否为空,如果不为空,进入下一步
  • 获取索引位置的节点
  • 检查key值是否一致,如果一致则直接返回,否则进入下一步
  • 如果是树节点,按照树节点的方法查找并返回指定key值节点
  • 否则说明是链表,通过循环来遍历查找
  • 如果还找不到,说明节点不存在,返回null

put()

因为put()方法和HashMap中的put()方法类似,都是在内部调用了一个putVal方法,所以我们就直接来看下面的putVal()方法

/** 
	 * @param onlyIfAbsent 为true表示不覆盖原值
	 */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // 理论上这里是第一次调用initTable()方法的地方
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            // 如果索引位置为空,则直接赋值
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 如果节点正在转移中,则通过helpTransfer()方法加速转移
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                // 查找时先给节点加对象锁
                synchronized (f) {
                	// 如果索引位置上的节点和f相等
                    if (tabAt(tab, i) == f) {
                    	// 如果hash值大于0
                        if (fh >= 0) {
                        	// 记录经过的链表上的节点个数
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // 找到指定的节点
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    // 如果设置了覆盖原值,就用新值替换旧值
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                // 沿着链表找
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 如果是树节点,就用树节点的方法来找
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                	// 超过了阈值(8)之后就将链表转换成红黑树
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 总结点数加1
        addCount(1L, binCount);
        return null;
    }
复制代码

流程和HashMap中的put方法基本一致,我们重点是控制并发安全的方法。在ConcurrentHashMap中,有以下几部分是与并发相关的:

  • 最外层是一个死循环,表示赋值失败会再次进行尝试
  • 通过 tabAt()/casTabAt() 方法 获取/设置 索引位置的值
  • 如果发现节点正在转移中(扩容时需要转移节点),通过helpTransfer()方法协助扩容
  • 先给索引位置的首节点加对象锁,再查找节点

其余方法

其余的常用方法对并发安全的体现就不如以上方法明显,所以在这里就不再赘述

总结

ConcurrentHashMap相比于JDK7中的分段锁,采用了 volatile+CAS+synchronized 的机制,将当前数组的状态在线程间相互传递,实现了各种高效的操作。当然,由于为了保证操作能够完成,在方法中大量使用了死循环判断,所以在多线程竞争激烈的情况下还是有可能发生线程阻塞的情况


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Web Designer's Idea Book, Vol. 2

The Web Designer's Idea Book, Vol. 2

Patrick McNeil / How / 2010-9-19 / USD 30.00

Web Design Inspiration at a Glance Volume 2 of The Web Designer's Idea Book includes more than 650 new websites arranged thematically, so you can easily find inspiration for your work. Auth......一起来看看 《The Web Designer's Idea Book, Vol. 2》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

SHA 加密
SHA 加密

SHA 加密工具