ConcurrentHashMap
包
可以看到这个 ConcurrentHashMap
是位于并发包下面的, 这可是大名鼎鼎的 JUC 呀
并发涉及到线程安全呀,锁的知识点,还有诸如关键字 volatile 等 有关内存屏障的东西 , 还有 Unsafe
类这个更底层的! 😄
嘿嘿~ 我们知道这个 HashMap
是线程不安全的~ ,而 线程安全的 Map
就有这个 HashTable
还有这个 ConcurrentHashMap
。
看到这里不知道小伙伴们会不会 有个小小的疑惑 ?
(・∀・(・∀・(・∀・*)
为啥有了 HashTable
还要再设计这个
ConcurrentHashMap
呢
我们直接来到 ConcurrentHashMap
的源码处,看看作者是怎么说的😝
可以看到它说 ConcurrentHashMap
在检索数据时不需要锁,也不支持 锁住整个表
来阻止其他线程的访问 ,它的操作方法基本和 HashTable
一样,只是底层对锁的使用
细节不一样~
让我们直观滴感受下 HashTable
的 **特别 **吧~
可以看到 它几乎对所有的操作都加了这个 synchronized
,这会导致每次操作都直接锁表,效率很低!
连 get 都加锁。。
所以才有了这个 ConcurrentHashMap
特点
HashTable
和ConcurrentHashMap
的 共同特点 是 不允许 key 和 value 为 null , 注意它们的 键值 都 不能是 null 呀 ,而HashMap
就比较随意,都可以是 null。
HashTable
位于这个java.util
包下,结合上文讲的这个 我们知道它属于 这个fail-fast
, 不支持并发修改 而ConcurrentHashMap
位于JUC
包下,属于fail-safe
,它 允许并发修改,但是 无法保证在迭代过程中获取到最新的值
结构
1.7 版本
请看图~
如图所示,在 JDK1.7 中, ConcurrentHashMap
是由 Segment
数组 + HashEntry
组成 ,数据结构上和 HashMap
一样,仍然是数组加链表。
ConcurrentHashMap
的一个最主要的特点就是 分段锁 (1)
它默认允许的并发量是16
它的这个分段呢,我们从图中也可以分析得出,其实就是给 HashMap
再封装一个
HashMap
的样子~ 变成这个
Segment
数组 来控制这个并发~
详情请看👇
Segment
代码如下
Segment
是 ConcurrentHashMap
中的一个内部类,继承了 ReentrantLock
** 这
个可重入锁(2)**
让我们来看看这个 put 操作~
@SuppressWarnings("unchecked") public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment // 关键步骤1 s = ensureSegment(j); // 关键步骤2 return s.put(key, hash, value, false); } 复制代码
可以发现它是通过这个 UNSAFE
类去获取这个这个 Segment
。
跟随源码来到这个 ensureSegment
方法中,这个方法主要用来创建和获取这个
Segment
。
可以看到 代码注释中关键步骤 3 使用到了这个 CAS锁(3)
@SuppressWarnings("unchecked") private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { Segment<K,V> proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 关键步骤 3 if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; } 复制代码
顺带提一下~
初始化 ConcurrentHashMap
时,会先初始化一个 Segment
最后来到主角~ // 上面代码注释的 关键步骤2
思路: 先通过 key
找到对应的 Segment
,接着再对它其中的 HashEntry
进行操
作~
s.put 源码如下
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 步骤1 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; // 步骤2 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { // 步骤3 if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { // 步骤4 unlock(); } return oldValue; } 复制代码
步骤 1:的主要作用是上锁,tryLock()
失败的话会进入一个 自旋获取锁 的过程,不
断的尝试,当尝试的次数大于最大次数 MAX_SCAN_RETRIES
时就调用 lock()
获取锁
(阻塞锁)~
步骤2: 遍历 HashEntry
,如果不为空则判断传入的 key 和当前遍历的 key 是否相
等,相等时如果onlyIfAbsent=false
时 覆盖旧的 value。
步骤3: 不为空则需要新建一个 HashEntry
并加入到 Segment
中,同时会先判断是
否需要扩容。
步骤4: 最后会解除在 1 中所获取当前 Segment 的锁。
get 操作不用锁~
原理也一样,根据key
的 hash
值找到那个 segment
,再找到里面的 hashEntry
虽然 ConcurrentHashMap
解决了这个并发问题,但是从由于数据结构和 1.7 的
HashMap
一样,所以都会有查询效率低的问题。
让我们来看看1.8的有啥不同吧~
1.8 版本
请看图~
为啥1.8 引入红黑树呢~
链表 查询时间复杂度 O(n) , 插入时间复杂度O(1)
红黑树 查询和插入时间复杂度都是 O(lgn)
主要特点~
采用了 CAS + synchronized
来保证并发安全性,放弃原有的 Segment 分段锁 。
Node 节点
稍微提下:
采用 volatile
修饰 是为了保证变量的 可见性
,禁止指令重排
put 操作
源码如下:
/** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 关键点 1 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; // 关键点 2 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; } 复制代码
关键点 1: 主要通过 cas 的方式去设置首节点
关键点2: 通过 synchronized
去锁住这个首节点,接着通过 if (tabAt(tab, i)
== f)
去判断是不是同个首节点 是的话再对其中的 数据进行操作
总结
ConcurrentHashMap
jdk版本 | 1.7 | 1.8 |
底层实现 | 数组+链表 | 数组+链表 / 红黑树 |
数据结构 | Segment 数组 + HashEntry 节点 |
Node 节点 |
锁 | 分段锁,默认并发是16,一旦初始化,Segment 数组大小就固定,后面不能扩容 |
CAS + synchronized 来保证并发安全性 |
put 操作 | 先获取锁,根据 key 的 hash 值 定位到 Segment ,再根据 key 的 hash 值 找到具体的 HashEntry ,再进行插入或覆盖,最后释放锁 |
根据 key 的 hash 值 定位到 Node 节点,再判断首节点是否为空,空的话通过 cas 去赋值首节点 ; 首节点非空的话,会用 synchronized 去锁住首节点,并判断是是同个首节点,是的话再去操作 |
释放锁 | 需要显示调用 unlock() |
不需要 |