ConcurrentHashMap
Tips:其实今天讲它肯定是大概的过一下,它既然是一个线程安全的容器,那么线程安全也要涉及到很多的知识点,比如
- 悲观锁与乐观锁
- 原子性,指令有序性和线程可见性
- 无锁算法
- 内存屏障
- Java内存模型
这些目前等讲JVM的时候我们再一起去探讨吧,我们今天主要是过一下它,等后面把知识点串起来就会明白的。
跟HashTable 的区别,1.7和1.8的比较
ConcurrentHashMap是conccurrent家族中的一个类,由于它可以高效地支持并发操作,以及被广泛使用,经典的开源框架Spring的底层数据结构就是使用ConcurrentHashMap实现的。与同是线程安全的老大哥HashTable相比,它已经更胜一筹,因此它的锁更加细化,而不是像HashTable一样为几乎每个方法都添加了synchronized锁,这样的锁无疑会影响到性能。
1.7和1.8实现线程安全的思想也已经完全变了其中抛弃了原有的Segment 分段锁,而采用了 CAS + synchronized 来保证并发安全性。它沿用了与它同时期的HashMap版本的思想,底层依然由“数组”+链表+红黑树的方式思想,但是为了做到并发,又增加了很多辅助的类,例如TreeBin,Traverser等对象内部类。
继承结构
跟HashMap就是一模一样
其实一个类是用来干嘛的再类的最开始的介绍是很详细的,但是我的渣渣英语水平,太难了,如果有能力的童鞋可以去好好看看。总结一下说了啥:
- JDK1.8底层是散列表+红黑树
- ConCurrentHashMap支持高并发的访问和更新,它是线程安全的
- 检索操作不用加锁,get方法是非阻塞的
- key和value都不允许为null
常量
//表的最大容量 private static final int MAXIMUM_CAPACITY = 1 << 30; //默认表的容量 private static final int DEFAULT_CAPACITY = 16; //最大数组长度 static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //默认并发级别 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; //负载因子 private static final float LOAD_FACTOR = 0.75f; //树化阈值 static final int TREEIFY_THRESHOLD = 8; //去树化阈值 static final int UNTREEIFY_THRESHOLD = 6; //树化的最小容量 static final int MIN_TREEIFY_CAPACITY = 64; //转移的最小值 private static final int MIN_TRANSFER_STRIDE = 16; //生成sizeCtl的最小位数 private static int RESIZE_STAMP_BITS = 16; //进行扩容允许的最大线程数 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; //sizeCtl所需要的偏移位数 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; //标志值 static final int MOVED = -1; // hash for forwarding nodes static final int TREEBIN = -2; // hash for roots of trees static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash //cpu数量 static final int NCPU = Runtime.getRuntime().availableProcessors(); //序列化属性 private static final ObjectStreamField[] serialPersistentFields = { new ObjectStreamField("segments", Segment[].class), new ObjectStreamField("segmentMask", Integer.TYPE), new ObjectStreamField("segmentShift", Integer.TYPE) }; 复制代码
大部分的和HashMap差不多,只有少数的不同
成员变量
//node数组,第一次插入节点时初始化 都加了内存屏障 transient volatile Node<K,V>[] table; //用于扩容 private transient volatile Node<K,V>[] nextTable; //计算器值,通过CAS修改值,没有竞争时使用,或者出现多线程初始化时回滚 private transient volatile long baseCount; //初始化和扩容的标志,concurrent包中有很多类似用法 // -1 初始化中; -N (N-1)个线程在扩容;table没有数据 初始化的大小 ; table有数据 下一次扩容的大小 // 非常重要的一个属性,源码中的英文翻译,直译过来是下面的四行文字的意思 // sizeCtl = -1,表示有线程正在进行真正的初始化操作 // sizeCtl = -(1 + nThreads),表示有nThreads个线程正在进行扩容操作 // sizeCtl > 0,表示接下来的真正的初始化操作中使用的容量,或者初始化/扩容完成后的threshold // sizeCtl = 0,默认值,此时在真正的初始化操作中使用默认容量 //sizeCtl = -(1 + nThreads)这个,网上好多都是用第二句的直接翻译去解释代码,这样理解是错误的 // 默认构造的16个大小的ConcurrentHashMap,只有一个线程执行扩容时,sizeCtl = -2145714174, // 但是照这段英文注释的意思,sizeCtl的值应该是 -(1 + 1) = -2 // sizeCtl在小于0时的确有记录有多少个线程正在执行扩容任务的功能,但是不是这段英文注释说的那样直接用 -(1 + nThreads) // 实际中使用了一种生成戳,根据生成戳算出一个基数,不同轮次的扩容操作的生成戳都是唯一的,来保证多次扩容之间不会交叉重 叠, // 当有n个线程正在执行扩容时,sizeCtl在值变为 (基数 + n) // 1.8.0_111的源码的383-384行写了个说明:A generation stamp in field sizeCtl ensures that resizings do not overlap. private transient volatile int sizeCtl; /** * The next table index (plus one) to split while resizing. */ //transfer的table索引 private transient volatile int transferIndex; //扩容或创建counterCells的自旋锁 private transient volatile int cellsBusy; // CounterCell数组 private transient volatile CounterCell[] counterCells; // views private transient KeySetView<K,V> keySet; private transient ValuesView<K,V> values; private transient EntrySetView<K,V> entrySet; 复制代码
这里重点解释一下sizeCtl这个属性。可以说它是ConcurrentHashMap中出镜率很高的一个属性,因为它是一个控制标识符,在不同的地方有不同用途,而且它的取值不同,也代表不同的含义。
- 负数代表正在进行初始化或扩容操作
- -1代表正在初始化
- -N 表示有N-1个线程正在进行扩容操作
- 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小,这一点类似于扩容阈值的概念。还后面可以看到,它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。
构造方法
其实和HashMap是大同小异的,此时ConcurrentHashMap的构造方法逻辑和HashMap基本一致,只是多了concurrencyLevel和SizeCtl。 而且此时也没有初始化table,它是要等到第一次put的时候才初始化table,
成员方法
ConcurrentHashMap#initTable()
再我们put方法中,首先会判断我们存放数据的table是否为null如果为null,这个时候就要初始化我们的方法了
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0)// sizeCtl < 0 标示有其他线程正在进行初始化操作,把线程让出cpu,对于table的厨师操作,只能有一个线程在进行 Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //如果把sizeCtl原子更新为-1成功,则当前线程进入初始化 // 如果原子更新失败则说明有其它线程先一步进入初始化了,则进入下一次循环 // 如果下一次循环时还没初始化完毕,则sizeCtl<0进入上面if的逻辑让出CPU // 如果下一次循环更新完毕了,则table.length!=0,退出循环 try {<br> // 为什么还要判断,因为:如果走到下面的finally改变了sizeCtl值,有可能其他线程是会进入这个逻辑的 if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 默认大小是16 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); // 0.75*n,下一次扩容阈值 } } finally { sizeCtl = sc; } break; } } return tab; } 复制代码
再这个初始化过程中,就已经有乐观锁的实现了。 可以看出table的初始化在一个cas方法中进行,当table为null或者长度为0时进入while方法。
进入之后判断sizeCtl的指,如果小于0则线程让步。由于初始状态sizeCtl是等于0的,说明前面已经有线程进入了elseif这部分,将sc的值置为-1,表示正在初始化。
如果sc大于0,则取sc,否则取默认容量16。然后计算下一次元素数量达到多少时需要resize。总结一下初始化方法
- 如果sizeCtl小于0,说明别的数组正在进行初始化,则让出执行权
- 如果sizeCtl大于0的话,则初始化一个大小为sizeCtl的数组
- 否则的话初始化一个默认大小(16)的数组
- 然后设置sizeCtl的值为数组长度的3/4
ConcurrentHashMap#transfer(Node
[],Node
)
该方法的目的是实现数组的转移,即ConcurrentHashMap的扩容逻辑。就是HashMap的resize方法
在ConcurrentHashMap中,扩容虽然和HashMap一样,将Node数组的长度变为原来的两倍,但是为了保证多线程的同步性,ConcurrentHashMap引入了nextTable属性。在扩容过程中,大致可以分为三步:
- 初始化一个空数组nextTable,长度为node数组的两倍,用作扩容后的数组的临时存储。
- 将原来的node数组复制到nextTable中。
- 将nextTable赋给原来的Node数组,并将nextTable置为null,修改sizeCtl。 ConcurrentHashMap通过遍历实现数组的复制,根据数组中Node节点的类型不同做不同处理。
- (1)普通Node类型,表示链表中的节点,根据其下标i放入对应的nextTable中i和n+i的位置,采用头插法,链表顺序与原来相反
- (2)ForwardingNode类型,表示已经处理过
- (3)TreeBin类型,表示红黑树的节点,进行红黑树的复制,并考虑是否需要去树化。
- (4)null,表示此处没有节点,加入ForwardingNode节点。根据上面的ConcurrentHashMap#helpTransfer(Node<K,V>[], Node<K,V>)可以知道,ForwardingNode类型的节点会触发其它线程加入数组的复制过程,即并发扩容。 另外,ConcurrentHashMap复制数组时的遍历是从n到0进行遍历的,并且不会完全遍历,而是按照线程数量分成若干个小人物,每个线程每次负责复制stride(步进)个桶([transfer-stride, transfer-1])。任务完成后可以再次申请。stride根据cpu数量而定,最小为16。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; //确定stride if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating //初始化,即使多个线程同时进入,也只不过是创建了多个Node<K,V>[]数组nt,在赋值给nextTab时后者覆盖前者,线程必然安全 try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; //数组元素复制结束的标志位 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); //advance表示该节点是否处理成功,处理成功后继续遍历,否则该节点再次处理(CAS) boolean advance = true; //循环是否接受的标志 boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { //数组复制结束后的操作 nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); 原数组长度的1.75倍,即扩容后的0.75倍 return; } //利用CAS方法更新sizeCtl,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } //如果遍历到的节点为空 则放入ForwardingNode指针 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //如果遍历到ForwardingNode节点 说明这个点已经被处理过了 直接跳过 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { //synchronized锁保证节点复制的线程安全 synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; //链表节点,头插法得到ln和hn两条链表,分别对应nextTable中下标i和n+i的元素 if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } //红黑树节点,先尾插法得到由TreeNode组成的ln和hn两条链表,分别对应nextTable中下标i和n+i的元素,然后作为参数传入TreeBin的构造方法 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } } 复制代码
我只能说复杂的一批。扩容是最复杂的,多线程的数据复制,还有红黑树的转换。脑子不够用。大神们去探究吧,我就记记结论吧