ConcurrentHashMap 实现原理
JDK1.7
首先将数据分为一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据时,其他段的数据也能被其他线程访问。
在JDK1.7中,ConcurrentHashMap采用Segment + HashEntry的方式进行实现,结构如下:
一个 ConcurrentHashMap 里包含一个 Segment 数组。
Segment 的结构和HashMap类似,是一种数组和链表结构,一个 Segment 包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素,每个 Segment 守护着一个HashEntry数组里的元素,当对 HashEntry 数组的数据进行修改时,必须首先获得对应的 Segment的锁。
- 该类包含两个静态内部类 HashEntry 和 Segment ;前者用来封装映射表的键值对,后者用来充当锁的角色;
- HashEntry 内部使用 volatile 的 value 字段来保证可见性,get 操作需要保证的是可见性,所以并没有什么同步逻辑。
- Segment 是一种可重入的锁 ReentrantLock,每个 Segment 守护一个HashEntry 数组里得元素,当对 HashEntry 数组的数据进行修改时,必须首先获得对应的 Segment 锁。
get 操作需要保证的是可见性,所以并没有什么同步逻辑
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key.hashCode()); //利用位操作替换普通数学运算 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // 以Segment为单位,进行定位 // 利用Unsafe直接进行volatile access if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { //省略 } return null; }
而对于 put 操作,首先是通过二次哈希避免哈希冲突,然后以 Unsafe 调用方式,直接获取相应的 Segment,然后进行线程安全的 put 操作:
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); // 二次哈希,以保证数据的分散性,避免哈希冲突 int hash = hash(key.hashCode()); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }
其核心逻辑实现在下面的内部方法中:
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // scanAndLockForPut会去查找是否有key相同Node // 无论如何,确保获取锁 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; // 更新已有value... } else { // 放置HashEntry到特定位置,如果超过阈值,进行rehash // ... } } } finally { unlock(); } return oldValue; }
JDK1.8
在JDK1.8中,放弃了Segment臃肿的设计,取而代之的是采用Node + CAS + Synchronized来保证并发安全进行实现。
synchronized只锁定当前链表或红黑二叉树的首节点,这样只要hash不冲突,就不会产生并发,效率又提升N倍。
- 总体结构上,它的内部存储和 HashMap 结构非常相似,同样是大的桶(bucket)数组,然后内部也是一个个所谓的链表结构(bin),同步的粒度要更细致一些。
- 其内部仍然有 Segment 定义,但仅仅是为了保证序列化时的兼容性而已,不再有任何结构上的用处。
- 因为不再使用 Segment,初始化操作大大简化,修改为 lazy-load 形式,这样可以有效避免初始开销,解决了老版本很多人抱怨的这一点。
- 数据存储利用 volatile 来保证可见性。
- 使用 CAS 等操作,在特定场景进行无锁并发操作。
- 使用 Unsafe、LongAdder 之类底层手段,进行极端情况的优化。
另外,需要注意的是,“线程安全”这四个字特别容易让人误解,因为ConcurrentHashMap 只能保证提供的原子性读写操作是线程安全的。
误区
我们来看一个使用 Map 来统计 Key 出现次数的场景吧,这个逻辑在业务代码中非常常见。
开发人员误以为使用了 ConcurrentHashMap 就不会有线程安全问题,于是不加思索地写出了下面的代码:
- 在每一个线程的代码逻辑中先通过 containsKey方法判断可以 是否存在。
- key 存在则 + 1,否则初始化 1.
// 共享数据 ConcurrentHashMap<String, Long> freqs = new ConcurrentHashMap<>(ITEM_COUNT); public void normaluse(String key) throws InterruptedException { if (freqs.containsKey(key)) { //Key存在则+1 freqs.put(key, freqs.get(key) + 1); } else { //Key不存在则初始化为1 freqs.put(key, 1L); } }
大错特错啊朋友们,需要注意 ConcurrentHashMap 对外提供的方法或能力的限制:
- 使用了 ConcurrentHashMap,不代表对它的多个操作之间的状态是一致的,是没有其他线程在操作它的,如果需要确保需要手动加锁。
- 诸如 size、isEmpty 和 containsValue 等聚合方法,在并发情况下可能会反映 ConcurrentHashMap 的中间状态。
因此在并发情况下,这些方法的返回值只能用作参考,而不能用于流程控制。
显然,利用 size 方法计算差异值,是一个流程控制。
- 诸如 putAll 这样的聚合方法也不能确保原子性,在 putAll 的过程中去获取数据可能会获取到部分数据。
正确写法:
//利用computeIfAbsent()方法来实例化LongAdder,然后利用LongAdder来进行线程安全计数 freqs.computeIfAbsent(key, k -> new LongAdder()).increment();
- 使用 ConcurrentHashMap 的原子性方法 computeIfAbsent 来做复合逻辑操作,判断 Key 是否存在 Value,如果不存在则把 Lambda 表达式运行后的结果放入 Map 作为 Value,也就是新创建一个 LongAdder 对象,最后返回 Value。
- 由于 computeIfAbsent 方法返回的 Value 是 LongAdder,是一个线程安全的累加器,因此可以直接调用其 increment 方法进行累加。