为并发而生的 ConcurrentHashMap
数据结构
Java 7为实现并发访问,引入了Segment这一结构,实现了分段锁,理论上最大并发度与Segment个数相等。
Java 8取消了基于 Segment 的分段锁思想,改用CAS + synchronized 控制并发操作,在某些方面提升了性能。并且追随 1.8 版本的 HashMap 底层实现,使用数组+链表+红黑树进行数据存储。
和 HashMap 中的语义一样,代表整个哈希表。在第一次插入时才懒加载初始化。大小永远是 2 的次幂。被迭代器直接访问。
一个连接表,用于哈希表扩容,扩容完成后会被重置为 null
保存着整个哈希表中存储的所有的结点的个数总和,类似于 HashMap 的 size 属性。主要用于当没有线程竞争时使用,也会作为哈希表初始化过程中的反馈。通过CAS 更新。
这是一个重要的属性,无论是初始化哈希表,还是扩容 rehash,都需要该依赖。有如下取值:
- >0:相当于 HashMap 中的 threshold,表示阈值
- 0:默认值
- -1:代表哈希表正在进行初始化
- <-1:代表有多个线程正在进行扩容
构造函数的实现也和HashMap类似
若传入 32,实际大小 64。即最接近1.5n+1的 2的次幂。因为如果你想存入 15 个元素,那么 16 是存不下的,需要扩容,所以直接给你初始化为 32 的容量。
寻址方式
同样是通过Key的哈希值与数组长度取模确定该Key在数组中的索引;
同样为了避免不太好的Key的hashCode设计,它通过如下方法计算得到Key的最终哈希值.
// usable bits of normal node hash static final int HASH_BITS = 0x7fffffff;
不同的是,Java 8的ConcurrentHashMap作者认为引入红黑树后,即使哈希冲突比较严重,寻址效率也足够高,所以作者并未在哈希值的计算上做过多设计,只是将Key的hashCode值与其高16位作异或并保证最高位为0(从而保证最终结果为正整数)
8.3 同步方式
对于put操作,如果Key对应的数组元素为null,则通过CAS操作将其设置为当前值;
如果Key对应的数组元素(也即链表表头或者树的根元素)不为null,则对该元素使用synchronized关键字申请锁,然后进行操作;
如果该put操作使得当前链表长度超过一定阈值,则将该链表转换为树,从而提高寻址效率.
对于读操作,由于数组被volatile修饰,因此不用担心数组的可见性问题;
同时每个元素是一个Node实例(Java 7中每个元素是一个HashEntry),它的Key值和hash值都由final修饰,不可变更,无须关心它们被修改后的可见性问题;
而其Value及对下一个元素的引用由volatile修饰,可见性也有保障.
8.4 操作
put方法和remove方法都会通过addCount方法维护Map的size;
size方法通过sumCount获取由addCount方法维护的Map的size.
下面我们主要来分析下 ConcurrentHashMap 的一个核心方法 put,我们也会一并解决掉该方法中涉及到的扩容、辅助扩容,初始化哈希表等方法。
8.4.1 put
HashMap多线程并发添加元素会导致数据丢失等并发问题,那么 ConcurrentHashMap 又是如何做到并发添加
的呢?
final V putVal(K key, V value, boolean onlyIfAbsent) { //对传入的参数进行合法性判断 if (key == null || value == null) throw new NullPointerException(); //计算键所对应的 hash 值 int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //如果哈希表还未初始化,那么初始化它 if (tab == null || (n = tab.length) == 0) tab = initTable(); //根据键的 hash 值找到哈希数组相应的索引位置 //如果为空,那么以CAS无锁式向该位置添加一个节点 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; }
这里需要详细说明的只有initTable
方法:初始化哈希表,它同时只允许一个线程进行初始化操作。
/** * Initializes table, using the size recorded in sizeCtl. */ private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; // 如果表为空才进行初始化操作 while ((tab = table) == null || tab.length == 0) { // sizeCtl 小于零说明已经有线程正在进行初始化操作 // 当前线程应该放弃 CPU 的使用 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin // 否则说明尚未有线程对表进行初始化,那么本线程就来做这个工作 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //保险起见,再次判断下表是否为空 try { if ((tab = table) == null || tab.length == 0) { //至此, sc 大于零说明容量已经初始化了,否则使用默认容量,其他线程再也无法初始化!!! int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") //根据容量构建数组 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; //计算阈值,等效于 n*0.75 sc = n - (n >>> 2); } } finally { //设置阈值 sizeCtl = sc; } break; } } return tab; }