一、背景:
线程不安全的HashMap
因为多线程环境下,使用Hashmap进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap。
效率低下的HashTable容器
HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法时,其他线程访问HashTable的同步方法时,可能会进入阻塞或轮询状态。如线程1使用put进行添加元素,线程2不但不能使用put方法添加元素,并且也不能使用get方法来获取元素,所以竞争越激烈效率越低。
二、应用场景
CHM适用于读者数量超过写者时,当写者数量大于等于读者时,CHM的性能是低于Hashtable和synchronized Map的。这是因为当锁住了整个Map时,读操作要等待对同一部分执行写操作的线程结束。CHM适用于做cache,在程序启动时初始化,之后可以被多个请求线程访问。正如Javadoc说明的那样,CHM是HashTable一个很好的替代,但要记住,CHM的比HashTable的同步性稍弱。
三、源码分析:
3.1 jdk1.7的源码
3.1.1锁分段技术
HashTable容器在竞争激烈的并发环境下表现出效率低下的原因,是因为所有访问HashTable的线程都必须竞争同一把锁,那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。这里“按顺序”是很重要的,否则极有可能出现死锁,在ConcurrentHashMap内部,段数组是final的,并且其成员变量实际上也是final的,但是,仅仅是将数组声明为final的并不保证数组成员也是final的,这需要实现上的保证。这可以确保不会出现死锁,因为获得锁的顺序是固定的。
ConcurrentHashMap的数据结构
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。
3.1.2 ConcurrentHashMap的主要数据结构
ConcurrentHashMap(1.7及之前)中主要实体类就是三个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点),对应上面的图可以看出之间的关系
ConcurrentHashMap的成员变量
//初始的容量 static final int DEFAULT_INITIAL_CAPACITY = 16; //初始的加载因子 static final float DEFAULT_LOAD_FACTOR = 0.75f; //初始的并发等级(下面会叙述作用) static final int DEFAULT_CONCURRENCY_LEVEL = 16; //最大容量 static final int MAXIMUM_CAPACITY = 1 << 30; //最小的segment数量 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; //最大的segment数量 static final int MAX_SEGMENTS = 1 << 16; static final int RETRIES_BEFORE_LOCK = 2; /** * Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组 * 每个Segment相当于一个子Hash表 */ final Segment<K,V>[] segments; /** * segmentMask和segmentShift主要是为了定位段 */ final int segmentMask; final int segmentShift;
Segment的结构
static final class Segment<K,V> extends ReentrantLock implements Serializable { //volatile,这使得能够读取到最新的 table值而不需要同步 transient volatile HashEntry<K,V>[] table; //count用来统计该段数据的个数 transient int count; //modCount统计段结构改变的次数,主要是为了检测对多个段进行遍历过程中某个段是否发生改变 transient int modCount; //threashold用来表示需要进行rehash的界限值 transient int threshold; //loadFactor表示负载因子。 final float loadFactor; Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } final V put(K key, int hash, V value, boolean onlyIfAbsent) { //略 } private void rehash(HashEntry<K,V> node) { //略 } private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { //略 } private void scanAndLock(Object key, int hash) { //略 } final V remove(Object key, int hash, Object value) { //略 } final boolean replace(K key, int hash, V oldValue, V newValue) { //略 } final V replace(K key, int hash, V value) { //略 } final void clear() { //略 } }
HashEntry的结构
static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; }
3.1.3 hash槽的的个数
为了加快定位段以及段中hash槽的速度,每个段hash槽的的个数都是2^n,这使得通过位运算就可以定位段和段中hash槽的位置。当并发级别为默认值16时,也就是段的个数,hash值的高4位决定分配在哪个段中。
3.1.4 定位操作:
private Segment<K,V> segmentForHash(int h) { long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u); }
既然ConcurrentHashMap使用分段锁Segment来保护不同段的数据,那么在插入和获取元素的时候,必须先通过哈希算法定位到Segment。可以看到ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再哈希。
再哈希,其目的是为了减少哈希冲突,使元素能够均匀的分布在不同的Segment上,从而提高容器的存取效率。假如哈希的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段锁也会失去意义。
默认情况下segmentShift为28,segmentMask为15,再哈希后的数最大是32位二进制数据,向右无符号移动28位,意思是让高4位参与到hash运算中, (hash >>> segmentShift) & segmentMask的运算结果分别是4,15,7和8,可以看到hash值没有发生冲突。
3.1.5remove(key)操作
/** * 先定位到段,然后委托给段的remove操作。当多个删除操作并发进行时, * 只要它们所在的段不相同,它们就可以同时进行。 */ public V remove(Object key) { int hash = hash(key); Segment<K,V> s = segmentForHash(hash); return s == null ? null : s.remove(key, hash, null); }
下面是Segment的remove方法实现:
final V remove(Object key, int hash, Object value) { if (!tryLock()) scanAndLock(key, hash); V oldValue = null; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> e = entryAt(tab, index); //pred用来记录待每次循环的前一个节点 HashEntry<K,V> pred = null; while (e != null) { K k; HashEntry<K,V> next = e.next; //当找到了待删除及节点的位置 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { V v = e.value; if (value == null || value == v || value.equals(v)) { //如果待删除节点的前节点为null,即待删除节点时链头节点,此时把第2 //个结点设为头结点 if (pred == null) setEntryAt(tab, index, next); else //如果有前节点,则待删除节点的前节点的next指向待删除节点的的下 //一个节点,删除成功 pred.setNext(next); ++modCount; --count; oldValue = v; } break; } pred = e; e = next; } } finally { unlock(); } return oldValue; }
3.1.5 get操作
public V get(Object key) { Segment<K,V> s; HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
get 逻辑比较简单:只需要将 Key 通过 Hash 之后定位到具体的 Segment ,再通过一次 Hash 定位到具体的元素上。
由于 HashEntry 中的 value 属性是用 volatile 关键词修饰的,保证了内存可见性,所以每次获取时都是最新值。
ConcurrentHashMap 的 get 方法是非常高效的,因为整个过程都不需要加锁。
3.1.6 put操作
put操作是委托给段的put方法。下面是段的put方法:
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 1. 将当前 Segment 中的 table 通过 key 的 hashcode 定位到 HashEntry HashEntry<K,V> node = tryLock() ? null : // 尝试获取锁,如果获取失败肯定就有其他线程存在竞争,则利用scanAndLockForPut() //自旋获取锁。 scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; // 2. 遍历该 HashEntry,如果不为空则判断传入的 key 和当前遍历的 key // 是否相等,相等则覆盖旧的 value if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } // 3. 为空则需要新建一个 HashEntry 并加入到 Segment 中,同时会先判断是否需要扩容 else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
由于put方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须得加锁。Put方法首先定位到Segment,然后在Segment里进行插入操作。插入操作需要经历两个步骤,第一步判断是否需要对Segment里的HashEntry数组进行扩容,第二步定位添加元素的位置然后放在HashEntry数组里。
- 是否需要扩容。在插入元素前会先判断Segment里的HashEntry数组是否超过容量(threshold),如果超过阀值,数组进行扩容。值得一提的是,Segment的扩容判断比HashMap更恰当,因为HashMap是在插入元素后判断元素是否已经到达容量的,如果到达了就进行扩容,但是很有可能扩容之后没有新元素插入,这时HashMap就进行了一次无效的扩容。
- 如何扩容。扩容的时候首先会创建一个两倍于原容量的数组,然后将原数组里的元素进行再hash后插入到新的数组里。为了高效ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。
3.1.7 containsKey和containsValue
public boolean containsKey(Object key) { Segment<K,V> s; // same as get() except no need for volatile value read HashEntry<K,V>[] tab; int h = hash(key); //根据key定位到segment,遍历HashEntry判断是否具有key long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return true; } } return false; } public boolean containsValue(Object value) { // Same idea as size() if (value == null) throw new NullPointerException(); final Segment<K,V>[] segments = this.segments; boolean found = false; long last = 0; int retries = -1; try { outer: for (;;) { //如果自旋的次数超过RETRIES_BEFORE_LOCK,强制堆所有segments加锁 if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } long hashSum = 0L; int sum = 0; //遍历Segment for (int j = 0; j < segments.length; ++j) { HashEntry<K,V>[] tab; Segment<K,V> seg = segmentAt(segments, j); if (seg != null && (tab = seg.table) != null) { for (int i = 0 ; i < tab.length; i++) { HashEntry<K,V> e; //遍历HashEntry for (e = entryAt(tab, i); e != null; e = e.next) { V v = e.value; if (v != null && value.equals(v)) { found = true; break outer; } } } sum += seg.modCount; } } if (retries > 0 && sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return found; }