今天轮到我来撕ConcurrentHashMap了

简介: 为了让自己在面试的时候能够不卑不亢地将ConcurrentHashMap娓娓道来,鄙人下定决心要好好整理ConcurrentHashMap,奈何实在是太复杂了……尽力而为吧。

前言


     

你是否有过这样的经历:信心满满地进入面试房间,慷慨激昂地进行自我介绍,一度以为自己稳操胜券,直到面试官开口:“你是学Java的是吧?那你知道ConcurrentHashMap的实现原理吗?你知道ConcurrentHashMap1.7和1.8的区别吗?你知道ConcurrentHashMap使用什么技术来保证线程安全吗?你能说说ConcurrentHashMap的put()方法吗?跟我说说ConcurrentHashmap 不支持 key 或者 value 为 null 的原因?put()方法如何实现线程安全呢?ConcurrentHashMap扩容机制?ConcurrentHashMap的get方法是否要加锁,为什么?为什么使用ConcurrentHashMap?


ConcurrentHashMap迭代器是强一致性还是弱一致性?HashMap呢?JDK1.7与JDK1.8中ConcurrentHashMap的区别?……”


为了让自己在面试的时候能够不卑不亢地将ConcurrentHashMap娓娓道来,鄙人下定决心要好好整理ConcurrentHashMap,奈何实在是太复杂了……尽力而为吧。


一、ConcurrentHashMap的实现原理



ConcurrentHashMap的出现主要为了解决hashmap在并发环境下不安全。


JDK1.8ConcurrentHashMap的设计与实现非常精巧,大量的利用了volatile,CAS等乐观锁技术来减少锁竞争对于性能的影响,ConcurrentHashMap保证线程安全的方案是:

       1、JDK1.8:synchronized+CAS+HashEntry+红黑树;

       2、JDK1.7:ReentrantLock+Segment+HashEntry。


1、 JDK1.7:Segment+HashEntry


在JDK1.7中ConcurrentHashMap由Segment(分段锁)数组结构和HashEntry数组组成,且主要通过Segment(分段锁)技术实现线程安全。


Segment是一种可重入锁,是一种数组和链表的结构,一个Segment中包含一个HashEntry数组,每个HashEntry又是一个链表结构,因此在ConcurrentHashMap查询一个元素的过程需要进行两次Hash操作,如下所示:


第一次Hash定位到Segment,

第二次Hash定位到元素所在的链表的头部

7e8e3fbe25c241e2941b4673ba2ccac7.png


JDK7中的ConcurrentHashMap由Segment和 HashEntry组成,即ConcurrentHashMap把哈希桶数组切分成小数组(Segment),每个小数组有n个 HashEntry组成。


将数据分为一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一段数据时,其他段的数据也能被其他线程访问,实现并发访问。


看看Segment的源码:

52e814c1959b4a1cac4010676125a853.png2f2a6129f9b046198bed2eef307fc4bd.png


Segment是ConcurrentHashMap的一个内部类主要的组成,继承自ReentrantLock。volatile修饰HashEntry<K, V>[] table可以保证在数组扩容时的可见性。


volatile修饰HashEntry 的数据value和下一个节点next,保证了多线程环境下数据获取时的可见性!


2、JDK1.8:synchronized+CAS+红黑树


JDK8 中ConcurrentHashMap 选择了与 HashMap 相同的 Node 数组 + 链表 + 红黑树结构。


在锁的实现上,抛弃了原有的 Segment 分段锁,采用 CAS + synchronized 实现更加细粒度的锁。将锁的级别控制在了更细粒度的哈希桶数组元素级别,只需要锁住这个链表头节点(或红黑树的根节点),就不会影响其他的哈希桶数组元素的读写,大大提高了并发度。


15e76a2616c44aac87158e9d67fae00b.png


Node的源码:

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
    }


ConcurrentHashMap采用Node类作为基本的存储单元,每个键值对(key-value)都存储在一个Node中,使用了volatile关键字修饰value和next,保证并发的可见性。其中Node子类有:


       1、ForwardingNode:扩容节点,只是在扩容阶段使用的节点,主要作为一个标记,在处理并发时起着关键作用,有了ForwardingNodes,也是ConcurrentHashMap有了分段的特性,提高了并发效率;

       2、TreeBin:TreeNode的代理节点,用于维护TreeNodes,ConcurrentHashMap的红黑树存放的是TreeBin;

       3、TreeNode:用于树结构中,红黑树的节点(当链表长度大于8时转化为红黑树),此节点不能直接放入桶内,只能是作为红黑树的节点;

       4、ReservationNode:保留结点


ConcurrentHashMap中查找元素、替换元素和赋值元素都是基于sun.misc.Unsafe原子操作实现多并发的无锁化操作。

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE);
    }
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectRelease(tab, ((long)i << ASHIFT) + ABASE, v);
    }



3、JDK1.7与JDK1.8中ConcurrentHashMap的区别总结


其实可以看出JDK1.8版本的ConcurrentHashMap的数据结构已经接近HashMap。相对而言,ConcurrentHashMap只是增加了同步的操作来控制并发,从JDK1.7版本的ReentrantLock + Segment + HashEntry,到JDK1.8版本中synchronized+CAS+HashEntry+红黑树都是如此。


(1)数据结构:取消了Segment分段锁的数据结构,取而代之的是数组+链表+红黑树的结构;

(2)保证线程安全机制:JDK1.7采用segment的分段锁机制实现线程安全,其中segment继承自ReentrantLock。JDK1.8采用CAS+Synchronized保证线程安全;

(3)锁的粒度:原来是对需要进行数据操作的Segment加锁,现调整为对每个数组头结点加锁;

(4)查询时间复杂度:从原来的遍历链表O(n),变成遍历红黑树O(logN)。


二、ConcurrentHashMap的put方法



1、流程及源码


    /**
     * Maps the specified key to the specified value in this table.
     * Neither the key nor the value can be null.
     *
     * <p>The value can be retrieved by calling the {@code get} method
     * with a key that is equal to the original key.
     *
     * @param key key with which the specified value is to be associated
     * @param value value to be associated with the specified key
     * @return the previous value associated with {@code key}, or
     *         {@code null} if there was no mapping for {@code key}
     * @throws NullPointerException if the specified key or value is null
     */
public V put(K key, V value) {
    return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }


1、如果key或者value为null,则抛出空指针异常,和HashMap不同的是HashMap单线程是允许为Null


if (key == null || value == null) throw new NullPointerException();


2、for的死循环,为了实现CAS的无锁化更新,如果table为null或者table的长度为0,则初始化table,调用initTable()方法(第一次put数据,调用默认参数实现,其中重要的sizeCtl参数)。


    //计算索引的第一步,传入键值的hash值
    int hash = spread(key.hashCode());
    int binCount = 0; //保存当前节点的长度
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh; K fk; V fv;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable(); //初始化Hash表
        ...
    }



  3、确定元素在Hash表的索引。通过hash算法可以将元素分散到哈希桶中。在ConcurrentHashMap中通过如下方法确定数组索引:


第一步:

static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS; 
    }


第二步:

(length-1) & (h ^ (h >>> 16)) & HASH_BITS);


4、通过tableAt()方法找到位置tab[i]Node,当Node为null时为没有hash冲突的话,使用casTabAt()方法CAS操作将元素插入到Hash表中,ConcurrentHashmap使用CAS无锁化操作,这样在高并发hash冲突低的情况下,性能良好。


else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //利用CAS操作将元素插入到Hash表中
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                    break;  // no lock when adding to empty bin(插入null的节点,无需加锁)
            }


5、当f不为null时,说明发生了hash冲突,当f.hash == MOVED==-1 时,说明ConcurrentHashmap正在发生resize操作,使用helpTransfer()方法帮助正在进行resize操作。


else if ((fh = f.hash) == MOVED) //f.hash == -1 
        //hash为-1 说明是一个forwarding nodes节点,表明正在扩容
        tab = helpTransfer(tab, f);


6、以上情况都不满足的时,使用synchronized同步块上锁当前节点Node ,并判断有没有线程对数组进行了修改,如果没有则进行:


*遍历该链表并统计该链表长度binCount,查找是否有和key相同的节点,如果有则将查找到节点的val值替换为新的value值,并返回旧的value值,否则根据key,value,hash创建新Node并将其放在链表的尾部;


*如果Node f是TreeBin的类型,则使用红黑树的方式进行插入。然后则退出synchronized(f)锁住的代码块


//当前节点加锁
 synchronized (f) {
 //判断下有没有线程对数组进行了修改
 if (tabAt(tab, i) == f) {
       //如果hash值是大于等于0的说明是链表
        if (fh >= 0) {
              binCount = 1;
              for (Node<K,V> e = f;; ++binCount) {
                    K ek;
                   //插入的元素键值的hash值有节点中元素的hash值相同,替换当前元素的值
                      if (e.hash == hash &&
                           ((ek = e.key) == key ||
                            (ek != null && key.equals(ek)))) {
                            oldVal = e.val;
                            if (!onlyIfAbsent)
                                //替换当前元素的值
                                e.val = value;
                            break;
                        }
                     Node<K,V> pred = e;
                      //如果循环到链表结尾还没发现,那么进行插入操作
                     if ((e = e.next) == null) {
                            pred.next = new Node<K,V>(hash, key, value);
                            break;
                           }
                }
          }else if (f instanceof TreeBin) { //节点为树
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                //替换旧值
                                p.val = value;
                        }
                    }
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update");
                }
            }     


7、执行完synchronized(f)同步代码块之后会先检查binCount,如果大于等于TREEIFY_THRESHOLD = 8则进行treeifyBin操作尝试将该链表转换为红黑树。


if (binCount != 0) {
              //如果节点长度大于8,转化为树
              if (binCount >= TREEIFY_THRESHOLD)
                   treeifyBin(tab, i);
              if (oldVal != null)
                   return oldVal; 
               break;
         }


8、执行了一个addCount方法,主要用于统计数量以及决定是否需要扩容.


addCount(1L, binCount);


2、put()方法如何实现线程安全?


(1)在第一次put数据时,调用initTable()方法

 /**  
 * Hash表的初始化和调整大小的控制标志。为负数,Hash表正在初始化或者扩容;  
 * (-1表示正在初始化,-N表示有N-1个线程在进行扩容)  
 * 否则,当表为null时,保存创建时使用的初始化大小或者默认0;  
 * 初始化以后保存下一个调整大小的尺寸。  
 */  
 private transient volatile int sizeCtl;  
     //第一次put,初始化数组  
     private final Node<K,V>[] initTable() {  
         Node<K,V>[] tab; int sc;  
         while ((tab = table) == null || tab.length == 0) {  
             //如果已经有别的线程在初始化了,这里等待一下  
             if ((sc = sizeCtl) < 0)  
             Thread.yield(); // lost initialization race; just spin  
             //-1 表示正在初始化  
             else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {  
             ...  
         } finally {  
            sizeCtl = sc;  
         }  
            break;  
         }  
     }  
     return tab;  
 }


使用sizeCtl参数作为控制标志的作用,当在从插入元素时,才会初始化Hash表。在开始初始化的时候,


首先判断sizeCtl的值,如果sizeCtl < 0,说明有线程在初始化,当前线程便放弃初始化操作。否则,将SIZECTL设置为-1,Hash表进行初始化;


初始化成功以后,将sizeCtl的值设置为当前的容量值。


在不存在hash冲突的时:


else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {  
     //利用CAS操作将元素插入到Hash表中  
     if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))  
     break;  // no lock when adding to empty bin(插入null的节点,无需加锁)  
 }


(f = tabAt(tab, i = (n - 1) & hash)) == null中使用tabAt原子操作获取数组,并利用casTabAt(tab, i, null, new Node<K,V>(hash, key, value))CAS操作将元素插入到Hash表中


在存在hash冲突时,先把当前节点使用关键字synchronized加锁,然后再使用tabAt()原子操作判断下有没有线程对数组进行了修改,最后再进行其他操作。


为什么要锁住更新操作的代码块?因为发生了哈希冲突,当前线程正在f所在的链表上进行更新操作,假如此时另外一个线程也需要到这个链表上进行更新操作,则需要等待当前线程更新完后再执行。

//当前节点加锁  
synchronized (f) {  
     //这里判断下有没有线程对数组进行了修改  
     if (tabAt(tab, i) == f) {  
     ......//do something  
 }
}


三、ConcurrentHashMap不支持key或者value为null的原因



ConcurrentHashmap和hashMap不同的是,concurrentHashMap的key和value都不允许为null。


因为concurrenthashmap它们是用于多线程的,并发的 ,如果map.get(key)得到了null,不能判断到底是映射的value是null,还是因为没有找到对应的key而为空,而用于单线程状态的hashmap却可以用containKey(key) 去判断到底是否包含了这个null。


四、ConcurrentHashMap的get方法是否要加锁,为什么?


看源码:

 

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }


ConcurrentHashMap的get方法就是从Hash表中读取数据,并不会与扩容不冲突,因此该方法也不需要同步锁,这样可提高ConcurrentHashMap 的并发性能。


五、为什么要使用ConcurrentHashMap



  • HashMap在多线程中进行put方法有可能导致程序死循环,因为多线程可能会导致HashMap形成环形链表,(即链表的一个节点的next节点永不为null,就会产生死循环),会导致CPU的利用率接近100%,因此并发情况下不能使用HashMap。
  • HashTable通过使用synchronized保证线程安全,但在线程竞争激烈的情况下效率低下。因为当一个线程访问HashTable的同步方法时,其他线程只能阻塞等待占用线程操作完毕。
  • ConcurrentHashMap使用分段锁的思想,对于不同的数据段使用不同的锁,可以支持多个线程同时访问不同的数据段,这样线程之间就不存在锁竞争,从而提高了并发效率。


六、ConcurrentHashMap迭代器是强一致性还是弱一致性?HashMap呢?



在迭代时,ConcurrentHashMap使用了不同于传统集合的快速失败迭代器,弱一致迭代器。


在这种迭代方式中,当iterator被创建后集合再发生改变就不再是抛出ConcurrentModificationException,取而代之的是在改变时new新的数据从而不影响原有的数据,iterator完成后再将头指针替换为新的数据,


这样iterator线程可以使用原来老的数据,而写线程也可以并发的完成改变,更重要的,这保证了多个线程并发执行的连续性和扩展性,是性能提升的关键。


七、 ConcurrentHashMap的并发度如何设计的



在JDK7中,实际上就是 ConcurrentHashMap 中的分段锁个数,即Segment[] 数组长度,默认是 16,这个值可以在构造函数中设置。


如果自己设置了并发读,ConcurrentHashMap 会使用大于等于该值的最小的2的幂指数作为实际并发度。如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大,原本位于同一个 Segment 内的访问会扩散到不同的 Segment中,从而引起程序性能下降。


在 JDK8 中,已经摒弃了 Segment的概念,选择了 Node数组 + 链表+红黑树结构,并发度大小依赖于数组的大小。  


八、ConcurrentHashMap 和 Hashtable 的效率哪个更高?



ConcurrentHashMap 的效率要高于 Hashtable,因为 Hashtable 给整个哈希表加锁从而实现线程安全。


而 ConcurrentHashMap 的锁粒度更低:

       在 JDK7 中采用Segment锁(分段锁)实现线程安全

       在 JDK8 中采用 CAS + synchronized 实现线程安全


九、还有其他方法能够在多线程下安全地操作Map吗?



182cf4ccf7c6426b8a2a65a035484713.png


可以使用 Collection.synchronizedMap(Map类型的对象)方法进行加同步锁。把对象转换成SynchronizedMap<K,V>类型。


如果传入的是 HashMap对象,其实也是对 HashMap 做的对象做了一层包装,里面使用对象锁来保证多线程场景下,线程安全,本质也是对 HashMap 进行全表锁。在竞争激烈的多线程环境下性能也非常差,不推荐使用!  


相关文章
|
8月前
|
SQL 安全 Java
剑指JUC原理-8.Java内存模型(下)
剑指JUC原理-8.Java内存模型
55 0
|
8月前
|
Java 编译器 测试技术
剑指JUC原理-8.Java内存模型(中)
剑指JUC原理-8.Java内存模型
62 0
|
8月前
|
缓存 安全 前端开发
剑指JUC原理-8.Java内存模型(上)
剑指JUC原理-8.Java内存模型
75 0
|
8月前
|
安全 容器
线程安全的集合类(ConcurrentHashMap面试超高频考点)
线程安全的集合类(ConcurrentHashMap面试超高频考点)
55 0
线程安全的集合类(ConcurrentHashMap面试超高频考点)
|
8月前
|
安全 Java 程序员
线程安全问题(面试常考)
线程安全问题(面试常考)
107 0
线程安全问题(面试常考)
|
安全 算法 Java
JUC第十五讲:JUC集合 - 面试 ConcurrentHashMap 看这篇就够了
JUC第十五讲:JUC集合 - 面试 ConcurrentHashMap 看这篇就够了
|
存储 机器学习/深度学习 算法
源码剖析之ConcurrentHashMap
​ JDK8中ConcurrentHashMap的结构是:数组+链表+红黑树。 ​ 因为在hash冲突严重的情况下,链表的查询效率是O(n),所以jdk8中改成了单个链表的个数大于8时,数组长度小于64就扩容,数组长度大于等于64,则链表会转换为红黑树,这样以空间换时间,查询效率会变O(nlogn)。 ​ 红黑树在Node数组内部存储的不是一个TreeNode对象,而是一个TreeBin对象,TreeBin内部维持着一个红黑树。 ​ 在JDK8中ConcurrentHashMap最经点的实现是使用CAS+synchronized+volatile 来保证并发安全
126 0
源码剖析之ConcurrentHashMap
|
存储 机器学习/深度学习 安全
撼动面试官系列 —— ConcurrentHashMap 深入问答
撼动面试官系列 —— ConcurrentHashMap 深入问答
103 0
|
存储 安全 算法
Java并发编程之ConcurrentHashMap源码分析
HashMap多线程put后get为null和多线程put的时候可能导致元素丢失 在多线程环境下,使用HashMap进行put操作时存在丢失数据的情况,为了避免这种bug的隐患,强烈建议使用ConcurrentHashMap代替HashMap
256 0
Java并发编程之ConcurrentHashMap源码分析
|
存储 安全 Java
面试突击18:为什么ConcurrentHashMap是线程安全的?
ConcurrentHashMap是HashMap的多线程版本,HashMap在并发操作时会有各种问题,比如死循环问题、数据覆盖等问题。而这些问题,只要使用ConcurrentHashMap就可以完美解决了,那问题来了,ConcurrentHashMap是如何保证线程安全的?它的底层又是如何实现的?接下来我们一起来看。
4893 1
面试突击18:为什么ConcurrentHashMap是线程安全的?