JDK 7 ConcurrentHashMap

简介: JDK 7 ConcurrentHashMap

概述

JDK1.7中的ConcurrentHashMap间接地实现了Map,并将每一个元素称为分段锁segment,每个segment都是一个HashEntry<K,V>数组,称为table,table的每个元素都是一个HashEntry的单向队列。

「HashTable是给整个容器加锁,ConcurrentHashMap是给每个segment加锁,」当一个线程修改segment 0时,其他线程也可以修改其它segment,即 只要不同的线程同一时刻访问的是不同的segment,就不会发生写冲突,比HashMap性能更好。

它维护了一个 segment 数组,每个 segment 对应一把锁

  • 优点:如果多个线程访问不同的 segment,实际是没有冲突的,这与 jdk8 中是类似的
  • 缺点:Segments 数组默认大小为16,这个容量初始化指定后就不能改变了,并且不是懒惰初始化

构造器分析

1. public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
2. if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
3. throw new IllegalArgumentException();
4. if (concurrencyLevel > MAX_SEGMENTS)
5.             concurrencyLevel = MAX_SEGMENTS;
6. // ssize 必须是 2^n, 即 2, 4, 8, 16 ... 表示了 segments 数组的大小
7. 
8. int sshift = 0;
9. int ssize = 1;
10. while (ssize < concurrencyLevel) {
11.             ++sshift;
12.             ssize <<= 1;
13.         }
14. // segmentShift 默认是 32 - 4 = 28
15. 
16. this.segmentShift = 32 - sshift;
17. // segmentMask 默认是 15 即 0000 0000 0000 1111
18. 
19. this.segmentMask = ssize - 1;
20. if (initialCapacity > MAXIMUM_CAPACITY)
21.             initialCapacity = MAXIMUM_CAPACITY;
22. int c = initialCapacity / ssize;
23. if (c * ssize < initialCapacity)
24.             ++c;
25. int cap = MIN_SEGMENT_TABLE_CAPACITY;
26. while (cap < c)
27.             cap <<= 1;
28. // 创建 segments and segments[0]
29. 
30.         Segment<K,V> s0 =
31. 
32. new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
33.                         (HashEntry<K,V>[])new HashEntry[cap]);
34.         Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
35.         UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
36. 
37. this.segments = ss;
38.     }

构造完成,如下图所示

可以看到 ConcurrentHashMap 没有实现懒惰初始化,空间占用不友好

其中 this.segmentShift 和 this.segmentMask 的作用是决定将 key 的 hash 结果匹配到哪个 segment

例如,根据某一 hash 值求 segment 位置,先将高位向低位移动 this.segmentShift 位

结果再与 this.segmentMask 做位于运算,最终得到 1010 即下标为 10 的 segment

put 流程

1. public V put(K key, V value) {
2.         Segment<K, V> s;
3. if (value == null)
4. throw new NullPointerException();
5. int hash = hash(key);
6. // 计算出 segment 下标
7. 
8. int j = (hash >>> segmentShift) & segmentMask;
9. 
10. // 获得 segment 对象, 判断是否为 null, 是则创建该 segment
11. 
12. if ((s = (Segment<K, V>) UNSAFE.getObject
13.                 (segments, (j << SSHIFT) + SBASE)) == null) {
14. // 这时不能确定是否真的为 null, 因为其它线程也发现该 segment 为 null,
15. 
16. // 因此在 ensureSegment 里用 cas 方式保证该 segment 安全性
17. 
18.             s = ensureSegment(j);
19.         }
20. // 进入 segment 的put 流程
21. 
22. return s.put(key, hash, value, false);
23.     }

segment 继承了可重入锁(ReentrantLock),它的 put 方法为

1. final V put(K key, int hash, V value, boolean onlyIfAbsent) {
2. // 尝试加锁
3.         HashEntry<K, V> node = tryLock() ? null :
4. // 如果不成功, 进入 scanAndLockForPut 流程
5. // 如果是多核 cpu 最多 tryLock 64 次, 进入 lock 流程
6. // 在尝试期间, 还可以顺便看该节点在链表中有没有, 如果没有顺便创建出来
7.                 scanAndLockForPut(key, hash, value);
8. // 执行到这里 segment 已经被成功加锁, 可以安全执行
9.         V oldValue;
10. try {
11.             HashEntry<K, V>[] tab = table;
12. int index = (tab.length - 1) & hash;
13.             HashEntry<K, V> first = entryAt(tab, index);
14. for (HashEntry<K, V> e = first; ; ) {
15. if (e != null) {
16. // 更新
17.                     K k;
18. if ((k = e.key) == key ||
19. 
20.                             (e.hash == hash && key.equals(k))) {
21.                         oldValue = e.value;
22. if (!onlyIfAbsent) {
23.                             e.value = value;
24.                             ++modCount;
25.                         }
26. break;
27.                     }
28.                     e = e.next;
29.                 } else {
30. // 新增
31. // 1) 之前等待锁时, node 已经被创建, next 指向链表头
32. if (node != null)
33.                         node.setNext(first);
34. else
35. // 2) 创建新 node
36.                         node = new HashEntry<K, V>(hash, key, value, first);
37. int c = count + 1;
38. // 3) 扩容
39. if (c > threshold && tab.length < MAXIMUM_CAPACITY)
40.                         rehash(node);
41. else
42. // 将 node 作为链表头
43.                         setEntryAt(tab, index, node);
44.                     ++modCount;
45.                     count = c;
46.                     oldValue = null;
47. break;
48.                 }
49.             }
50.         } finally {
51.             unlock();
52.         }
53. return oldValue;
54.     }

rehash 流程

发生在 put 中,因为此时已经获得了锁,因此 rehash 时不需要考虑线程安全

1. private void rehash(HashEntry<K,V> node) {
2.         HashEntry<K,V>[] oldTable = table;
3. int oldCapacity = oldTable.length;
4. int newCapacity = oldCapacity << 1;
5.         threshold = (int)(newCapacity * loadFactor);
6.         HashEntry<K,V>[] newTable =
7. 
8.                 (HashEntry<K,V>[]) new HashEntry[newCapacity];
9. int sizeMask = newCapacity - 1;
10. for (int i = 0; i < oldCapacity ; i++) {
11.             HashEntry<K,V> e = oldTable[i];
12. if (e != null) {
13.                 HashEntry<K,V> next = e.next;
14. int idx = e.hash & sizeMask;
15. if (next == null) // Single node on list
16. 
17.                     newTable[idx] = e;
18. else { // Reuse consecutive sequence at same slot
19. 
20.                     HashEntry<K,V> lastRun = e;
21. int lastIdx = idx;
22. // 过一遍链表, 尽可能把 rehash 后 idx 不变的节点重用 
23. 
24. for (HashEntry<K,V> last = next;
25.                          last != null;
26.                          last = last.next) {
27. int k = last.hash & sizeMask;
28. if (k != lastIdx) {
29.                             lastIdx = k;
30.                             lastRun = last;
31.                         }
32.                     }
33.                     newTable[lastIdx] = lastRun;
34. // 剩余节点需要新建
35. 
36. for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
37. V v = p.value;
38. int h = p.hash;
39. int k = h & sizeMask;
40.                         HashEntry<K,V> n = newTable[k];
41.                         newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
42.                     }
43.                 }
44.             }
45.         }
46. // 扩容完成, 才加入新的节点
47. 
48. int nodeIndex = node.hash & sizeMask; // add the new node
49. 
50.         node.setNext(newTable[nodeIndex]);
51.         newTable[nodeIndex] = node;
52. 
53. // 替换为新的 HashEntry table
54. 
55.         table = newTable;
56.     }

get 流程

get 时并未加锁,用了 UNSAFE 方法保证了可见性,扩容过程中,get 先发生就从旧表取内容,get 后发生就从新 表取内容

1. public V get(Object key) {
2.         Segment<K, V> s; // manually integrate access methods to reduce overhead
3. 
4.         HashEntry<K, V>[] tab;
5. int h = hash(key);
6. // u 为 segment 对象在数组中的偏移量
7. 
8. long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
9. // s 即为 segment
10. 
11. if ((s = (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u)) != null &&
12. 
13.                 (tab = s.table) != null) {
14. for (HashEntry<K, V> e = (HashEntry<K, V>) UNSAFE.getObjectVolatile
15. 
16.                     (tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE);
17.                  e != null; e = e.next) {
18.                 K k;
19. if ((k = e.key) == key || (e.hash == h && key.equals(k)))
20. return e.value;
21.             }
22.         }
23. return null;
24.     }

size 计算流程

  • 计算元素个数前,先不加锁计算两次,如果前后两次结果如一样,认为个数正确返回
  • 如果不一样,进行重试,重试次数超过 3,将所有 segment 锁住,重新计算个数返回
1. public int size() {
2. // Try a few times to get accurate count. On failure due to
3. // continuous async changes in table, resort to locking.
4. final Segment<K,V>[] segments = this.segments;
5. int size;
6. boolean overflow; // true if size overflows 32 bits
7. 
8. long sum; // sum of modCounts
9. 
10. long last = 0L; // previous sum
11. 
12. int retries = -1; // first iteration isn't retry
13. 
14. try {
15. for (;;) {
16. if (retries++ == RETRIES_BEFORE_LOCK) {
17. // 超过重试次数, 需要创建所有 segment 并加锁
18. 
19. for (int j = 0; j < segments.length; ++j)
20.                         ensureSegment(j).lock(); // force creation
21. 
22.                 }
23.                 sum = 0L;
24.                 size = 0;
25.                 overflow = false;
26. for (int j = 0; j < segments.length; ++j) {
27.                     Segment<K,V> seg = segmentAt(segments, j);
28. if (seg != null) {
29.                         sum += seg.modCount;
30. int c = seg.count;
31. if (c < 0 || (size += c) < 0)
32.                             overflow = true;
33.                     }
34.                 }
35. if (sum == last)
36. break;
37.                 last = sum;
38.             }
39.         } finally {
40. if (retries > RETRIES_BEFORE_LOCK) {
41. for (int j = 0; j < segments.length; ++j)
42.                     segmentAt(segments, j).unlock();
43.             }
44.         }
45. return overflow ? Integer.MAX_VALUE : size;
46.     }


相关文章
|
存储 安全 Java
【JDK 源码分析】ConcurrentHashMap 底层结构
【1月更文挑战第27天】【JDK 源码分析】ConcurrentHashMap 底层结构
|
算法 安全 Java
深入JDK源码:揭开ConcurrentHashMap底层结构的神秘面纱
【8月更文挑战第24天】`ConcurrentHashMap`是Java并发编程中不可或缺的线程安全哈希表实现。它通过精巧的锁机制和无锁算法显著提升了并发性能。本文首先介绍了早期版本中使用的“段”结构,每个段是一个带有独立锁的小型哈希表,能够减少线程间竞争并支持动态扩容以应对高并发场景。随后探讨了JDK 8的重大改进:取消段的概念,采用更细粒度的锁控制,并引入`Node`等内部类以及CAS操作,有效解决了哈希冲突并实现了高性能的并发访问。这些设计使得`ConcurrentHashMap`成为构建高效多线程应用的强大工具。
179 2
|
机器学习/深度学习 存储 Java
认真学习jdk1.8下ConcurrentHashMap的实现原理
认真学习jdk1.8下ConcurrentHashMap的实现原理
144 0
认真学习jdk1.8下ConcurrentHashMap的实现原理
|
安全 Java 索引
认真学习jdk1.7下ConcurrentHashMap的实现原理
认真学习jdk1.7下ConcurrentHashMap的实现原理
227 0
|
机器学习/深度学习 存储 Java
认真学习jdk1.8下ConcurrentHashMap的扩容机制
认真学习jdk1.8下ConcurrentHashMap的扩容机制
596 0
|
安全 Java
JDK 8 ConcurrentHashMap
JDK 8 ConcurrentHashMap
|
存储 算法 安全
JDK1.8中的ConcurrentHashMap使用及场景分析
JDK1.8中的ConcurrentHashMap使用及场景分析
JDK1.8中的ConcurrentHashMap使用及场景分析
|
存储 安全 Java
JDK7中ConcurrentHashMap源码解析
JDK7中ConcurrentHashMap源码解析
JDK7中ConcurrentHashMap源码解析
|
存储 安全 Java
JDK1.8中的ConcurrentHashMap源码分析
JDK1.8中的ConcurrentHashMap源码分析
JDK1.8中的ConcurrentHashMap源码分析
|
Java 容器
JDK1.8 中 ConcurrentHashMap源码分析(一)容器初始化
本文是博主学习JDK源码的记录,此为 ConcurrentHashMap源码分析(一),希望对大家有所帮助。
290 0
JDK1.8 中 ConcurrentHashMap源码分析(一)容器初始化