概述
JDK1.7中的ConcurrentHashMap间接地实现了Map,并将每一个元素称为分段锁segment,每个segment都是一个HashEntry<K,V>数组,称为table,table的每个元素都是一个HashEntry的单向队列。
「HashTable是给整个容器加锁,ConcurrentHashMap是给每个segment加锁,」当一个线程修改segment 0时,其他线程也可以修改其它segment,即 只要不同的线程同一时刻访问的是不同的segment,就不会发生写冲突,比HashMap性能更好。
它维护了一个 segment 数组,每个 segment 对应一把锁
- 优点:如果多个线程访问不同的 segment,实际是没有冲突的,这与 jdk8 中是类似的
- 缺点:Segments 数组默认大小为16,这个容量初始化指定后就不能改变了,并且不是懒惰初始化
构造器分析
1. public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { 2. if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) 3. throw new IllegalArgumentException(); 4. if (concurrencyLevel > MAX_SEGMENTS) 5. concurrencyLevel = MAX_SEGMENTS; 6. // ssize 必须是 2^n, 即 2, 4, 8, 16 ... 表示了 segments 数组的大小 7. 8. int sshift = 0; 9. int ssize = 1; 10. while (ssize < concurrencyLevel) { 11. ++sshift; 12. ssize <<= 1; 13. } 14. // segmentShift 默认是 32 - 4 = 28 15. 16. this.segmentShift = 32 - sshift; 17. // segmentMask 默认是 15 即 0000 0000 0000 1111 18. 19. this.segmentMask = ssize - 1; 20. if (initialCapacity > MAXIMUM_CAPACITY) 21. initialCapacity = MAXIMUM_CAPACITY; 22. int c = initialCapacity / ssize; 23. if (c * ssize < initialCapacity) 24. ++c; 25. int cap = MIN_SEGMENT_TABLE_CAPACITY; 26. while (cap < c) 27. cap <<= 1; 28. // 创建 segments and segments[0] 29. 30. Segment<K,V> s0 = 31. 32. new Segment<K,V>(loadFactor, (int)(cap * loadFactor), 33. (HashEntry<K,V>[])new HashEntry[cap]); 34. Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; 35. UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] 36. 37. this.segments = ss; 38. }
构造完成,如下图所示
可以看到 ConcurrentHashMap 没有实现懒惰初始化,空间占用不友好
其中 this.segmentShift 和 this.segmentMask 的作用是决定将 key 的 hash 结果匹配到哪个 segment
例如,根据某一 hash 值求 segment 位置,先将高位向低位移动 this.segmentShift 位
结果再与 this.segmentMask 做位于运算,最终得到 1010 即下标为 10 的 segment
put 流程
1. public V put(K key, V value) { 2. Segment<K, V> s; 3. if (value == null) 4. throw new NullPointerException(); 5. int hash = hash(key); 6. // 计算出 segment 下标 7. 8. int j = (hash >>> segmentShift) & segmentMask; 9. 10. // 获得 segment 对象, 判断是否为 null, 是则创建该 segment 11. 12. if ((s = (Segment<K, V>) UNSAFE.getObject 13. (segments, (j << SSHIFT) + SBASE)) == null) { 14. // 这时不能确定是否真的为 null, 因为其它线程也发现该 segment 为 null, 15. 16. // 因此在 ensureSegment 里用 cas 方式保证该 segment 安全性 17. 18. s = ensureSegment(j); 19. } 20. // 进入 segment 的put 流程 21. 22. return s.put(key, hash, value, false); 23. }
segment 继承了可重入锁(ReentrantLock),它的 put 方法为
1. final V put(K key, int hash, V value, boolean onlyIfAbsent) { 2. // 尝试加锁 3. HashEntry<K, V> node = tryLock() ? null : 4. // 如果不成功, 进入 scanAndLockForPut 流程 5. // 如果是多核 cpu 最多 tryLock 64 次, 进入 lock 流程 6. // 在尝试期间, 还可以顺便看该节点在链表中有没有, 如果没有顺便创建出来 7. scanAndLockForPut(key, hash, value); 8. // 执行到这里 segment 已经被成功加锁, 可以安全执行 9. V oldValue; 10. try { 11. HashEntry<K, V>[] tab = table; 12. int index = (tab.length - 1) & hash; 13. HashEntry<K, V> first = entryAt(tab, index); 14. for (HashEntry<K, V> e = first; ; ) { 15. if (e != null) { 16. // 更新 17. K k; 18. if ((k = e.key) == key || 19. 20. (e.hash == hash && key.equals(k))) { 21. oldValue = e.value; 22. if (!onlyIfAbsent) { 23. e.value = value; 24. ++modCount; 25. } 26. break; 27. } 28. e = e.next; 29. } else { 30. // 新增 31. // 1) 之前等待锁时, node 已经被创建, next 指向链表头 32. if (node != null) 33. node.setNext(first); 34. else 35. // 2) 创建新 node 36. node = new HashEntry<K, V>(hash, key, value, first); 37. int c = count + 1; 38. // 3) 扩容 39. if (c > threshold && tab.length < MAXIMUM_CAPACITY) 40. rehash(node); 41. else 42. // 将 node 作为链表头 43. setEntryAt(tab, index, node); 44. ++modCount; 45. count = c; 46. oldValue = null; 47. break; 48. } 49. } 50. } finally { 51. unlock(); 52. } 53. return oldValue; 54. }
rehash 流程
发生在 put 中,因为此时已经获得了锁,因此 rehash 时不需要考虑线程安全
1. private void rehash(HashEntry<K,V> node) { 2. HashEntry<K,V>[] oldTable = table; 3. int oldCapacity = oldTable.length; 4. int newCapacity = oldCapacity << 1; 5. threshold = (int)(newCapacity * loadFactor); 6. HashEntry<K,V>[] newTable = 7. 8. (HashEntry<K,V>[]) new HashEntry[newCapacity]; 9. int sizeMask = newCapacity - 1; 10. for (int i = 0; i < oldCapacity ; i++) { 11. HashEntry<K,V> e = oldTable[i]; 12. if (e != null) { 13. HashEntry<K,V> next = e.next; 14. int idx = e.hash & sizeMask; 15. if (next == null) // Single node on list 16. 17. newTable[idx] = e; 18. else { // Reuse consecutive sequence at same slot 19. 20. HashEntry<K,V> lastRun = e; 21. int lastIdx = idx; 22. // 过一遍链表, 尽可能把 rehash 后 idx 不变的节点重用 23. 24. for (HashEntry<K,V> last = next; 25. last != null; 26. last = last.next) { 27. int k = last.hash & sizeMask; 28. if (k != lastIdx) { 29. lastIdx = k; 30. lastRun = last; 31. } 32. } 33. newTable[lastIdx] = lastRun; 34. // 剩余节点需要新建 35. 36. for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { 37. V v = p.value; 38. int h = p.hash; 39. int k = h & sizeMask; 40. HashEntry<K,V> n = newTable[k]; 41. newTable[k] = new HashEntry<K,V>(h, p.key, v, n); 42. } 43. } 44. } 45. } 46. // 扩容完成, 才加入新的节点 47. 48. int nodeIndex = node.hash & sizeMask; // add the new node 49. 50. node.setNext(newTable[nodeIndex]); 51. newTable[nodeIndex] = node; 52. 53. // 替换为新的 HashEntry table 54. 55. table = newTable; 56. }
get 流程
get 时并未加锁,用了 UNSAFE 方法保证了可见性,扩容过程中,get 先发生就从旧表取内容,get 后发生就从新 表取内容
1. public V get(Object key) { 2. Segment<K, V> s; // manually integrate access methods to reduce overhead 3. 4. HashEntry<K, V>[] tab; 5. int h = hash(key); 6. // u 为 segment 对象在数组中的偏移量 7. 8. long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; 9. // s 即为 segment 10. 11. if ((s = (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u)) != null && 12. 13. (tab = s.table) != null) { 14. for (HashEntry<K, V> e = (HashEntry<K, V>) UNSAFE.getObjectVolatile 15. 16. (tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE); 17. e != null; e = e.next) { 18. K k; 19. if ((k = e.key) == key || (e.hash == h && key.equals(k))) 20. return e.value; 21. } 22. } 23. return null; 24. }
size 计算流程
- 计算元素个数前,先不加锁计算两次,如果前后两次结果如一样,认为个数正确返回
- 如果不一样,进行重试,重试次数超过 3,将所有 segment 锁住,重新计算个数返回
1. public int size() { 2. // Try a few times to get accurate count. On failure due to 3. // continuous async changes in table, resort to locking. 4. final Segment<K,V>[] segments = this.segments; 5. int size; 6. boolean overflow; // true if size overflows 32 bits 7. 8. long sum; // sum of modCounts 9. 10. long last = 0L; // previous sum 11. 12. int retries = -1; // first iteration isn't retry 13. 14. try { 15. for (;;) { 16. if (retries++ == RETRIES_BEFORE_LOCK) { 17. // 超过重试次数, 需要创建所有 segment 并加锁 18. 19. for (int j = 0; j < segments.length; ++j) 20. ensureSegment(j).lock(); // force creation 21. 22. } 23. sum = 0L; 24. size = 0; 25. overflow = false; 26. for (int j = 0; j < segments.length; ++j) { 27. Segment<K,V> seg = segmentAt(segments, j); 28. if (seg != null) { 29. sum += seg.modCount; 30. int c = seg.count; 31. if (c < 0 || (size += c) < 0) 32. overflow = true; 33. } 34. } 35. if (sum == last) 36. break; 37. last = sum; 38. } 39. } finally { 40. if (retries > RETRIES_BEFORE_LOCK) { 41. for (int j = 0; j < segments.length; ++j) 42. segmentAt(segments, j).unlock(); 43. } 44. } 45. return overflow ? Integer.MAX_VALUE : size; 46. }


