初始化每个segment
输入参数initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每个segment的负载因子,在构造方法中需要通过这两个参数来初始化数组中的每个segment
if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = 1; while (cap < c) cap <<= 1; for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment<K,V>(cap, loadFacotr);
上述代码中的cap是segment里HashEntry数组的长度,它等于initialCapacity除以ssize的倍数,如果c大于1,就会取大于等于c的2的N次方值,所以cap不是1就是2的N次方。segment的容量threshold = (int)cap*loadFactor
,默认情况下initialCapacity
等于16,也就是容纳16个元素,loadFactor(负载因子)等于0.75,通过计算cap=1,threshold=0
定位Segment
ConcurrentHashMap使用分段所Segment来保护不同段的数据,那么在插入和获取元素的时候,必须先通过散列算法定位到某一个Segment。ConcurrentHashMap首先选用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再散列。
private static int hash(int h) { h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); }
使用再散列算法,目的为了减少散列冲突,使元素能够均有地分步在不同的Segment上,从而提高容器的存取效率。假如散列的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段所也会失去意义。在JDK 1.7中ConcurrentHashMap通过以下散列算法定位segment
final Segment<K,V> segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask]; }
Put/Get/Size操作
由于JDK1.7和1.8的底层实现和方法有所不同,所以我们这里分别介绍下:
Put操作
JDK1.7中的put操作如下
static class Segment<K,V> extends ReentrantLock implements Serializable
从上述Segment的继承体系中可以看出,Segment实现了ReentrantLock,也就带有锁的功能。由于put方法需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量是,必须加锁。
- 对segment加锁,在加锁时,会通过继承ReentrantLock的tryLock()方法尝试获取锁,若获取成功,就直接在相应的位置插入;若已经有线程获取了该Segment的锁,那当前线程会以自旋的方式继续调用tryLock()方法获取锁,超过指定次数就挂起,等待唤醒
- 判断是否需要对Segment里的HashEntry数组进行扩容
- 定位添加元素的位置,然后将其放在HashEntry数组中
其中如果需要扩容该如何扩容呢?
- 在插入元素前会先判断Segment里的HashEntry数组是否超过容量threshold,如果超过了阈值,则对数组进行扩容。
- 建立一个容量是原来两倍的数组,然后将原数组中的元素再散列后插入到新数组中。为了高效,
ConcurrentHashMap
不会对整个容器进行扩容,而是只对某个segment进行扩容
这里的扩容方式与HashMap的扩容方式稍有不同,HashMap是在插入元素之后判断元素是否已经达到容量,如果达到了就进行扩容,但是有可能扩容之后就没有新元素插入,则HashMap就进行了一次无效的扩容。
JDK1.8中的put操作如下
对当前table进行无条件自循环put成功,可以分成以下步骤
- 如果没有初始化就调用initTable()方法来进行初始化过程,若通过散列得到的位置中没有节点,则不加锁直接将节点通过CAS操作插入
- 如果发现该桶中有一个节点,需要扩容则进行扩容
- 如果存在hash冲突,就加锁来保证线程安全,这里存在两种情况:一种是链表形式就直接遍历到尾部插入;另一种是红黑树形式,就按红黑树结构插入。判断依据是如果链表的数量大于阈值,则先转换为红黑树结构,再一次进入循环
如果添加成功,则调用addCount()统计size,并且检查是否需要扩容。
Get操作
JDK1.7中的get操作如下
public V get(Object key) { int hash = hash(key.hashCode()); //1,先经过一次再散列 return segmentFor(hash).get(key, hash); //2,使用这个散列值通过散列运算定位到Segment,再通过散列算法定位到元素 }
JDK1.8中的get操作如下
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; , int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 1,计算hash值,定位到该table索引位置,如果是首节点符合,则返回 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // 2. 如果遇到扩容的时候,会调用标志正在扩容节点Forwarding Node的find方法,查找该节点,若匹配则返回 // 若查找到就返回 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; // 3,若既不是首节点也不是forwarding node,则向下遍历 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
get操作的高效体现在整个get过程不需要加锁,除非读到的值是空才会加锁重读,因为它的get方法里将使用的共享变量都定义成volatile类型。例如用于统计当前Segment大小的count字段和用于存储值的HashEntry的value。定义成volatile的变量,能够在线程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值(由Java内存模型的happen before原则保证),但是只能被单线程写(有一种情况可以被多线程写,就是写入的值不依赖于原值)。在get操作期间,只需要读取共享变量count和value值,所以不需要加锁
Size操作
计算ConcurrentHashMap的元素大小,就必须要统计Segment里的元素的大小后求和。上面说过Segment的全局变量count是一个volatile变量,在并发的场景下,可能会导致计算出来的size值和实际的size值有偏差。因为在计算count值的时候,可能有新数据的插入,导致结果的不准确。那么,最安全的做法就是在统计size的时候把所有Segment的put、remove和clean方法全部锁住,但这种做法显然是非常低效的。
JDK1.7中的size操作如下
JDK 1.7中是如下处理的,先尝试2次通过不锁住Segment的方式来统计各个Segment大小,如果统计的过程中,容器的count发生了变化,则在采用加锁的方式来统计所有Segment的大小。使用modCount变量判断容器是否发生了变化,在put、remove和clean方法里操作元素前都会将变量modCount进行加1操作,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化
try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; // 进行2次统计 last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } }
JDK1.8中的size操作如下
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; // 变化的数量 long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
写时复制
JUC集合包中的List和Set实现类按照写时复制的实现原理包括: CopyOnWriteArrayList和 CopyOnWriteArraySet。
写时复制原理解决了并发修改异常,每当有写入的时候,这个时候CopyOnWriteArrayList 底层实现添加的原理是先copy出一个容器(可以简称副本),再往新的容器里添加这个新的数据,最后把新的容器的引用地址赋值给了之前那个旧的的容器地址,但是在添加这个数据的期间,其他线程如果要去读取数据,仍然是读取到旧的容器里的数据。
- CopyOnWriteArrayList相当于线程安全的ArrayList,它实现了List接口。CopyOnWriteArrayList是支持高并发的。
- CopyOnWriteArraySet相当于线程安全的HashSet,它继承于AbstractSet类。 内部包含一个
CopyOnWriteArrayList
对象(聚合关系),它是通过CopyOnWriteArrayList实现的
这种方式采用了写时加锁复制,读时读旧容器不需任何处理,这种方式有两个显著的缺点:
- 内存占用问题,很明显,两个数组同时驻扎在内存中,如果实际应用中,数据比较多,而且比较大的情况下,占用内存会比较大,针对这个其实可以用ConcurrentHashMap来代替。
- 数据一致性问题,CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器
基于此我们一般不会大量使用。
并发队列类
JUC集合包中Queue的实现类包括: ArrayBlockingQueue, LinkedBlockingQueue, LinkedBlockingDeque, ConcurrentLinkedQueue和ConcurrentLinkedDeque。它们的框架如下图所示:
依据队列是否会发生阻塞分为两种,阻塞队列
- ArrayBlockingQueue,是数组实现的线程安全的有界的阻塞队列。
- LinkedBlockingQueue是,单向链表实现的(指定大小)阻塞队列,该队列按 FIFO(先进先出)排序元素
- LinkedBlockingDeque是,双向链表实现的(指定大小)双向并发阻塞队列,该阻塞队列同时支持FIFO和FILO两种操作方式。
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
非阻塞队列:
- ConcurrentLinkedQueue,是单向链表实现的无界队列,该队列按 FIFO(先进先出)排序元素。
- ConcurrentLinkedDeque,是双向链表实现的无界队列,该队列同时支持FIFO和FILO两种操作方式。
简单说下有界和无界的概念:
- 有界队列:就是有固定大小的队列。比如设定了固定大小的 LinkedBlockingQueue,又或者大小为 0,只是在生产者和消费者中做中转用的 SynchronousQueue。
- 无界队列:指的是没有设置固定大小的队列。这些队列的特点是可以直接入列,直到溢出。当然现实几乎不会有到这么大的容量(超过 Integer.MAX_VALUE),所以从使用者的体验上,就相当于 “无界”。比如没有设定固定大小的 LinkedBlockingQueue
一般为了防止撑爆JVM内存,采用有界队列的形式