这里我们想研究的是jdk1.8中ConcurrentHashMap的addCount(long x, int check)
方法。如下
所示在put方法的最后会触发addCount(long x, int check)
方法进行元素个数的统计。
我们再回顾一下另一个参数binCount :
- 在操作链表的分支
if (fh >= 0)
中 用于统计put前链表长度 - 在
if (f instanceof TreeBin)
分支中看到, binCount=2 , 该值被直接赋值常量 2
触发addCount的场景
- 在
putVal(K key, V value, boolean onlyIfAbsent)方法中最后会触发addCount(1L, binCount);
- 在
replaceNode
方法中会触发addCount(-1L, -1)
- 在
clear()
方法中触发addCount(delta, -1);
; - 在
compute
或者computeIfAbsent
或者computeIfPresent
方法中触发addCount((long)delta, binCount)
【1】addCount
添加到计数,若table太小且尚未调整大小,则触发transfer。如果当前正在扩容,则尝试帮助进行扩容调整。扩容后, 再次检查整个 ConcurrentHashMap 的容量占用情况, 因为此时扩容操作是落后于元素计数器的增加操作。
// x 表示需要 add 的数 // check < 0 ,不需要检查resize, check <= 1 only check if uncontended private final void addCount(long x, int check) { CounterCell[] as; long b, s; // counterCells 默认为null,如果as为null且没有成功更新BASECOUNT就进入if // 如果as不为null,直接进入if if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; //记录是否存在竞争,true表示不存在竞争 boolean uncontended = true; // m = as.length - 1 //a = as[ThreadLocalRandom.getProbe() & m] // 如果a 不为null,那么更新a.value = a.value+x if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || // 这里还会将CAS结果赋予uncontended // 也就是CAS失败,表示有竞争 !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } // check <= 1 直接返回 if (check <= 1) return; // 求和 baseCount+ΣCounterCell.value s = sumCount(); } // 这下面咱前面系列已经见过很多次了,这里就不再赘述了 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; // 如果桶数组过小的话, 触发扩容或者帮助扩容 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { // 这几个条件也是有bug的 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); // 需要注意的是,在扩容后,这里又触发了一次 sumCount s = sumCount(); } } }
从上面代码可以看到,统计的最终还是依赖于fullAddCount(x, uncontended)
和sumCount()
。
jdk1.8 的 ConcurrentHashMap
使用一个 volatile
类型的变量baseCount
记录元素个数, 并通过 CAS 操作更新。
但是, 如果有两个线程并发使用 CAS 修改 baseCount
值, 导致其中一个操作失败后, 会调用 fullAddCount
方法初始化 counterCells
这个数组。该数组 的大小是 2^n。
所以严格意义上, ConcurrentHashMap 的计数器不是一个变量负责的, 而是由 baseCount 和 counterCells 共同维护的, 这一点可以从 sumCount()函数的源码得到验证, sumCount 函数会将 baseCount 的值和 counterCells 的值求和后返回。
注意: sumCount 函数的操作显然不是原子性的, 在加和的过程中, 每个 Cell 的值很明显是可以被其他线程修改的, 所以 ConcurrentHashMap 返回的 size 并不是一个精确的值!
//基本计数器值,主要在没有争用时使用,但在表初始化竞争期间也用作后备。通过CAS更新。 private transient volatile long baseCount; /** * Spinlock (locked via CAS) used when resizing and/or creating CounterCells. */ //旋转锁(locked via CAS),当扩容或者创建CounterCells时使用 private transient volatile int cellsBusy; // 存放CounterCell的数组,不为null时,其是2的N次幂 private transient volatile CounterCell[] counterCells; // 对应变量baseCount private static final long BASECOUNT=U.objectFieldOffset (k.getDeclaredField("baseCount")); // 对应变量cellsBusy private static final long CELLSBUSY=U.objectFieldOffset (k.getDeclaredField("cellsBusy")); // 对应变量 CounterCell.value private static final long CELLVALUE=U.objectFieldOffset (ck.getDeclaredField("value")); //数组的最大长度 tab.leght private static final int MAXIMUM_CAPACITY = 1 << 30; // 扩容戳移动位数 private static int RESIZE_STAMP_BITS = 16; /** * The maximum number of threads that can help resize. * Must fit in 32 - RESIZE_STAMP_BITS bits. */ // 最大扩容线程数 65535 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; /** * The bit shift for recording size stamp in sizeCtl. */ // 扩容戳移位数 = 16 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
CounterCell是什么呢?用于并发(分布式)计数的填充Cell。改编自LongAdder和Striped64。有关解释,请参阅其内部文档。所以,我们还得研究下LongAdder和Striped64。
@sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } }
② 触发fullAddCount的分支
第一个if
if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x))
进入if 方法体的场景:
- counterCells不为null
- counterCells为null,但是不能CAS更新
BASECOUNT=BASECOUNT+x
第二个if
if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)))
进入if 方法体的场景(as=counterCells):
- ① as 为null;
- ② as 不为null,但是
(m = as.length - 1) < 0
,这里其实先给 m 进行了赋值,然后判断。如果判断为真,那么说明counterCells是一个空数组。 - ③ ①②都不满足,
a = as[ThreadLocalRandom.getProbe() & m]) == null
。这里获取了一个CounterCell 赋予了a。 - ④ 不能更新CELLVALUE 为
a.value+x
。
③ 统计所有CounterCell的value和
也就是baseCount+ΣCounterCell.value
。这里需要注意的是,可能a.value
一直在变化。
final long sumCount() { CounterCell[] as = counterCells; CounterCell a; // 将sum更新为baseCount long sum = baseCount; if (as != null) { // 遍历每一个CounterCell 获取value进行累加 for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } // 返回sum return sum; }
【2】LongAdder&Striped64
① LongAdder
LongAdder 继承自Striped64,也是java.util.concurrent.atomic
包下的一个原子类。想要搞明白fullAddCount,必须搞懂LongAdder。
一个或多个变量均持有 long sum(初始零)。当线程并发更新(比如add方法)时,变量集可能会动态增长以减少竞争。方法sum(或等价的longValue()方法)返回持有sum的变量的合计值。
当多个线程更新用于收集统计信息而不是细粒度同步控制的公共和时,此类通常优于AtomicLong。在低更新竞争下,这两个类具有相似的特性。但在高竞争情况下,此类的预期吞吐量显著更高,但代价是更高的空间消耗。
AtomicLong 的原理是 CAS 无锁更新, 当并发线程较多致使竞争激烈时, 效率会下降, 因为每一时刻其实至多只有一个线程能更新, 其他的线程只能不断地自旋等待, 效率甚至不如直接进入等待队列(有锁的方式), 至少可以避免频繁的线程切换开销以及死循环占用的 CPU。
LongAdder可以和ConcurrentHashMap
一起使用以维持一个 可伸缩的 frequency map(a form of histogram or multiset)。例如,要将一个计数添加到一个ConcurrentHashMap<String,LongAdder> freqs
,如果尚未初始化,那么可以使用 freqs.computeIfAbsent(k -> new LongAdder()).increment();
。
这个类继承自Number,但是没有定义方法诸如equals、hashCode和compareTo。因为实例常常会发生改变,所以作为集合的键不是那么有用。
如下是其add方法,可以看到ConcurrentHashMap的addCount方法是参考了这个add方法。
public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } }
这里的基本逻辑就是用一个单一的变量存储基础值base, 在没有竞争发生的时候, 都在这个基础值上操作。发生竞争以后, 就尝试利用 Cell 的哈希表的元素上, 独立操作。
getProbe() 函数会返回针对当前线程计算的一个 hash 值。
如果 Cell 的哈希表尚未初始化, 或某个线程映射的 Cell 为空, 就由 longAccumulate 这个方法去初始化, 维护, 扩容 Cell 构成的 hash 表。
如下所示是其 sum 方法,与ConcurrentHashMap中sumCount()
方法可以说简直一致
public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
ok,我们继续看看其父类Striped64。
② Striped64
其是一个64位值分散读写的公共表现和机制,子类有LongAdder、LongAccumulator、DoubleAdder以及DoubleAccumulator。
其维护了静态内部类@sun.misc.Contended static final class Cell 以及一些 volatile 修饰的成员。
// CPU个数 static final int NCPU = Runtime.getRuntime().availableProcessors(); // Cell数组,大小是2的N次幂 transient volatile Cell[] cells; // 基本值,主要在没有争用时使用,但在表初始化竞争期间也用作后备。通过CAS更新。 transient volatile long base; // /旋转锁(locked via CAS),当扩容或者创建CounterCells时使用 transient volatile int cellsBusy;
看到这些变量是不是很熟悉?ConcurrentHashMap
几乎是将LongAdder(Striped64)
实现了一遍。
这个类维护一个延迟初始化table(原子更新),以及一个额外的“base”字段。表的大小是2的N次幂。这个类中几乎所有的声明都是包私有的,由子类直接访问。
Table中的每一个元素都是Cell,其是AtomicLong 填充 一种变体(通过@sun.misc.Contended
)用来减少缓存争用。填充对于大多数原子类来说是多余的,因为它们通常不规则地分散在内存中,因此不会相互干扰。但是,驻留在数组中的原子对象将倾向于彼此相邻放置,因此在没有这种预防措施的情况下,通常会共享缓存行(对性能有巨大的负面影响)。
由于Cells 相对比较大,所以我们直到需要的时候才会创建它。当没有线程竞争时,所有的更新都针对于变量“base”。在第一次争用(CAS更新失败 base)时,table被初始化为大小2。当有进一步的竞争,表大小将会2倍扩容直到接近于2的N次幂(大于或等于CPU的个数的一个数)。Table的槽位在它们被需要前保持为null。
单个自旋锁(“cellsBusy”)用于初始化和调整table的大小,以及用新Cell填充table的槽位。不需要阻塞锁,当锁不可用时,线程会尝试其他槽位(或 base 变量)。在这些重试过程中,争用增加,局部性降低,这仍然优于替代方案。
通过ThreadLocalRandom维护的线程probe field 用作每个线程的哈希代码。我们让它们保持未初始化(为零)(如果它们以这种方式出现),直到它们在插槽0处竞争。然后将它们初始化为通常不会与其他值冲突的值。当执行更新操作时,竞争或者冲突由CAS的失败表示。在发生冲突时,如果表大小小于容量,则它的大小将加倍,除非其他线程持有锁。如果哈希槽为空,并且锁可用,则会创建一个新Cell。否则,如果插槽存在,则尝试CAS。重试通过“double hashing”进行,使用二次散列(Marsaglia XorShift)尝试查找空闲插槽。
// 返回当前线程的 PROBE 值 static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE); }
table大小是有上限的,因为当线程数超过CPU数时,假设每个线程都绑定到一个CPU,就会存在一个完美的哈希函数,将线程映射到插槽,从而消除冲突。当我们达到容量时,我们通过随机改变冲突线程的哈希代码来搜索此映射。由于搜索是随机的,并且冲突只通过CAS故障才知道,收敛速度可能很慢,并且由于线程通常不会永远绑定到CPU,因此可能根本不会发生。然而,尽管有这些限制,在这些情况下观察到的争用率通常较低。
当对Cell进行哈希运算的线程终止时,以及在table扩容时(导致没有线程在扩展掩码下对其进行哈希运算)单元格可能会变得未使用。我们不尝试检测或删除此类Cell,假设对于长期运行的实例,观察到的争用级别将再次出现,因此最终将再次需要这些Cell。对于存活时间短的Cell来说说,这并不重要。
如下所示在Striped64类中维护了一个静态内部类Cell。其是AtomicLong的变异进支持原子访问和CAS。
@sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }
Striped64 的核心概念是, 它内部维护一个 Cell 的哈希表(hash table), 你可以把每个 Cell 当做一个 AtmoicLong。 当两个线程并发想在一个 LongAdder 实例上增加一个数值时, 那么很有可能, 两个线程可以在哈希表中不同的 Cell 上各自进行操作。 这可以尽可能的降低竞争发生。
与ConcurrentHashMap的addCount(long x, int check)方法触发了fullAddCount(x, uncontended);方法一样,LongAdder的add(long x)触发了longAccumulate(x, null, uncontended);方法。所以在第三部分我们来分析该方法。
总结来看, ConcurrentHashMap是不是把LongAdder和Striped64一些核心属性与方法实现了一遍?
【3】fullAddCount
其与Striped64的longAccumulate方法其实是一致的。
// See LongAdder version for explanation //X表示要增加的值,wasUncontended表示是否存在竞争 private final void fullAddCount(long x, boolean wasUncontended) { int h; // 获取当前线程的hash值 并尝试设置wasUncontended if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); // force initialization h = ThreadLocalRandom.getProbe(); wasUncontended = true; } //标记是否冲突 如果最后一个槽位非空,那么该值为true, boolean collide = false; // True if last slot nonempty // 无限循环 for (;;) { CounterCell[] as; CounterCell a; int n; long v; // 如果counterCells存在且有元素 if ((as = counterCells) != null && (n = as.length) > 0) { // 如果 (n - 1) & h 槽位没有元素 -- 和HashMap的定位一样的原理 if ((a = as[(n - 1) & h]) == null) { // 如果没有竞争 if (cellsBusy == 0) { // Try to attach new Cell // 创建 CounterCell(x) CounterCell r = new CounterCell(x); // Optimistic create // 如果此时没有锁且成功加锁 // 尝试将CounterCell(x)放到rs[j = (m - 1) & h]位置 if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false; try { // Recheck under lock CounterCell[] rs; int m, j; // 这里加锁后再次判断,避免counterCells发生了改变 if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0;//释放锁 } // 如果成功创建则break,否则continue if (created) break; continue; // Slot is now non-empty } } //这行这句话的情形是, cells 已经被其他线程加锁,正在进行 Cell 初始化, 扩容, //或者创建新 Cell, 无法判断是否发生hash冲突 collide = false; } // 如果槽位有数据,并且存在竞争 // 说明上一次针对这个 Cell 的竞争失败了, 所以现在极有可能处于高度并发的情 //况下, 所以先不要再次竞争, 等到 cell 数组扩容以后, 再参与竞争 else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash // 如果成功更新槽位数据value = value+x 那么break else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break; // 上一个分支尝试 cas 操作失败了, 会进入这个分支判断 // 如果 cell 数组容量已经超过 CPU 核数量, 或者 cells 已经被扩容为新的一个数组 // collide 置为 false, 继续循环 else if (counterCells != as || n >= NCPU) collide = false; // At max size or stale //否则将collide 改为true,以说明需要进行扩容 else if (!collide) collide = true; // 不得已,进行2 倍扩容。前提是无锁且当前线程成功加锁 else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == as) {// Expand table unless stale // 2 倍扩容 CounterCell[] rs = new CounterCell[n << 1]; //迁移数据 for (int i = 0; i < n; ++i) rs[i] = as[i]; // 指向新的CounterCell[] rs counterCells = rs; } } finally { cellsBusy = 0;//解锁 } collide = false;//表示已经扩容 continue; // Retry with expanded table } // 执行到这里说明尝试扩容的时候, 没有抢到机会 // 为当前线程重新计算一个 hash, 下次争取尝试一个不一样的 Cell h = ThreadLocalRandom.advanceProbe(h); } // 如果没有旋转锁,且counterCells == as并且成功加锁,那么初始化counterCells // 并存放CounterCell(x) else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // Initialize table if (counterCells == as) {//再次判断 // 初始化为 2 CounterCell[] rs = new CounterCell[2]; // 存放x rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { // 释放锁 cellsBusy = 0; } if (init) break; } // 作为备用手段,BASECOUNT+x else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base } }