如果还不了解ConcurrentHashMap的可以看: ConcurrentHashMap概述
ConcurrentHashMap在jdk1.7中的设计
再JDK7中,ConcurrentHashMap使用的是segments+table+链表的结构。
其中对每一个segment进行加锁,那么只要访问的是不同的segment,就可以实现并发访问hashmap的能力了。
每一个segment都是一个HashEntry<K,V>[] table, table中的每一个元素本质上都是一个HashEntry的
单向队列。比如table[3]为首节点,table[3]->next为节点1,之后为节点2,依次类推。
如果不懂volatile 关键字的作用,可以看:volatile关键字作用
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { // 将整个hashmap分成几个小的map,每个segment都是一个锁;与hashtable相比,这么设计的目 的是对于put, remove等操作,可以减少并发冲突,对 // 不属于同一个片段的节点可以并发操作,大大提高了性能 final Segment<K,V>[] segments; // 本质上Segment类就是一个小的hashmap,里面table数组存储了各个节点的数据,继承了 ReentrantLock, 可以作为互拆锁使用 static final class Segment<K,V> extends ReentrantLock implements Serializable { transient volatile HashEntry<K,V>[] table; transient int count; } // 基本节点,存储Key, Value值 static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; } }
在jdk1.8中做的改进
改进一:取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存数据,采用
table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。
改进二:将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。对于
hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash之后散列的很均
匀,那么table数组中的每个队列长度主要为0或者1。但实际情况并非总是如此理想,虽然
ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存
在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因此,对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度可以降低到O(logN),可以改进性能。
为了说明以上2个改动,看一下put操作是如何实现的。
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 如果table为空,初始化;否则,根据hash值计算得到数组索引i,如果tab[i]为空,直接新 建节点Node即可。注:tab[i]实质为链表或者红黑树的首节点。 if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 如果tab[i]不为空并且hash值为MOVED,说明该链表正在进行transfer操作,返回扩容完成 后的table。 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; // 针对首个节点进行加锁操作,而不是segment,进一步减少线程冲突 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; // 如果在链表中找到值为key的节点e,直接设置e.val = value即 可。 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 如果没有找到值为key的节点,直接新建Node并加入链表即可。 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 如果首节点为TreeBin类型,说明为红黑树结构,执行putTreeVal操作。 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 如果节点数>=8,那么转换链表结构为红黑树结构。 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 计数增加1,有可能触发transfer操作(扩容)。 addCount(1L, binCount); return null; }
另外,在其他方面也有一些小的改进,比如新增字段 transient volatile CounterCell[] counterCells; 可
方便的计算hashmap中所有元素的个数,性能大大优于jdk1.7中的size()方法。
ConcurrentHashMap jdk1.7、jdk1.8性能比较
public class CompareConcurrentHashMap { private static ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<String, Integer>(40000); public static void putPerformance(int index, int num) { for (int i = index; i < (num + index) ; i++) map.put(String.valueOf(i), i); } public static void getPerformance2() { long start = System.currentTimeMillis(); for (int i = 0; i < 400000; i++) map.get(String.valueOf(i)); long end = System.currentTimeMillis(); System.out.println("get: it costs " + (end - start) + " ms"); } public static void main(String[] args) throws InterruptedException { long start = System.currentTimeMillis(); final CountDownLatch cdLatch = new CountDownLatch(4); for (int i = 0; i < 4; i++) { final int finalI = i; new Thread(new Runnable() { public void run() { CompareConcurrentHashMap.putPerformance(100000 * finalI, 100000); cdLatch.countDown(); } }).start(); } cdLatch.await(); long end = System.currentTimeMillis(); System.out.println("put: it costs " + (end - start) + " ms"); CompareConcurrentHashMap.getPerformance2(); } }
程序运行多次后取平均值,结果如下:
谈谈ConcurrentHashMap1.7和1.8的不同实现
在多线程环境下,使用 HashMap 进行 put 操作时存在丢失数据的情况,为了避免这种bug的隐患,强烈建议使用 ConcurrentHashMap 代替 HashMap ,为了对更深入的了解,本文将对JDK1.7和1.8的不同实现进行分析。
JDK1.7
数据结构
jdk1.7中采用 Segment + HashEntry 的方式进行实现,结构如下:
ConcurrentHashMap 初始化时,计算出 Segment 数组的大小 ssize 和每个 Segment 中 HashEntry 数
组的大小 cap ,并初始化 Segment 数组的第一个元素;其中 ssize 大小为2的幂次方,默认为16, cap大小也是2的幂次方,最小值为2,最终结果根据根据初始化容量 initialCapacity 进行计算,计算过
程如下:
if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1;
其中 Segment 在实现上继承了 ReentrantLock ,这样就自带了锁的功能。
put实现
当执行 put 方法插入数据时,根据key的hash值,在 Segment 数组中找到相应的位置,如果相应位置的Segment 还未初始化,则通过CAS(Compare and Swap,是一个原子操作)进行赋值,接着执行 Segment 对象的 put 方法通过加锁机制插入数据,实现如下:
场景:线程A和线程B同时执行相同 Segment 对象的 put 方法
1、线程A执行 tryLock() 方法成功获取锁,则把 HashEntry 对象插入到相应的位置;
2、线程B获取锁失败,则执行 scanAndLockForPut() 方法,在 scanAndLockForPut 方法中,会通过
重复执行 tryLock() 方法尝试获取锁,在多处理器环境下,重复次数为64,单处理器重复次数为1,当
执行 tryLock() 方法的次数超过上限时,则执行 lock() 方法挂起线程B;
3、当线程A执行完插入操作时,会通过 unlock() 方法释放锁,接着唤醒线程B继续执行;
size实现
因为 ConcurrentHashMap 是可以并发插入数据的,所以在准确计算元素时存在一定的难度,一般的思路是统计每个 Segment 对象中的元素个数,然后进行累加,但是这种方式计算出来的结果并不一样的准确的,因为在计算后面几个 Segment 的元素个数时,已经计算过的 Segment 同时可能有数据的插入或者删除,在1.7的实现中,采用了如下方式:
try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } }
先采用不加锁的方式,连续计算元素的个数,最多计算3次:
1、如果前后两次计算结果相同,则说明计算出来的元素个数是准确的;
2、如果前后两次计算结果都不同,则给每个 Segment 进行加锁,再计算一次元素的个数;
JDK1.8
数据结构
1.8中放弃了 Segment 臃肿的设计,取而代之的是采用 Node + CAS + Synchronized (JDK1.6之后优化了锁机制)来保证并发安全进行实现,结构如下:
只有在执行第一次 put 方法时才会调用 initTable() 初始化 Node 数组,实现如下:
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
put实现
当执行 put 方法插入数据时,根据key的hash值,在 Node 数组中找到相应的位置,实现如下:
1、如果相应位置的 Node 还未初始化,则通过CAS插入相应的数据;
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin }
2、如果相应位置的 Node 不为空,且当前该节点不处于移动状态,则对该节点加 synchronized 锁,如果该节点的 hash 不小于0,则遍历链表更新节点或插入新节点;
if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } }