概述
Java Review - 并发编程_原子操作类原理剖析中提到了 AtomicLong通过CAS提供了非阻塞的原子性操作,相比使用阻塞算法的同步器来说它的性能已经很好了,但是JDK开发组并不满足于此。使用AtomicLong时,在高并发下大量线程会同时去竞争更新同一个原子变量,但是由于同时只有一个线程的CAS操作会成功,这就造成了大量线程竞争失败后,会通过无限循环不断进行自旋尝试CAS的操作,而这会白白浪费CPU资源。
因此JDK 8新增了一个原子性递增或者递减类LongAdder用来克服在高并发下使用AtomicLong的缺点。
既然AtomicLong的性能瓶颈是由于过多线程同时去竞争一个变量的更新而产生的,那么如果把一个变量分解为多个变量,让同样多的线程去竞争多个资源,是不是就解决了性能问题?是的,LongAdder就是这个思路。
下面通过图来理解两者设计的不同之处
使用LongAdder时,则是在内部维护多个Cell变量,每个Cell里面有一个初始值为0的long型变量,这样,在同等并发量的情况下,争夺单个变量更新操作的线程量会减少,这变相地减少了争夺共享资源的并发量。
另外,多个线程在争夺同一个Cell原子变量时如果失败了,它并不是在当前Cell变量上一直自旋CAS重试,而是尝试在其他Cell的变量上进行CAS尝试,这个改变增加了当前线程重试CAS成功的可能性。
最后,在获取LongAdder当前值时,是把所有Cell变量的value值累加后再加上base返回的。
LongAdder维护了一个延迟初始化的原子性更新数组(默认情况下Cell数组是null)和一个基值变量base。由于Cells占用的内存是相对比较大的,所以一开始并不创建它,而是在需要时创建,也就是惰性加载。
当一开始判断Cell数组是null并且并发线程较少时,所有的累加操作都是对base变量进行的。保持Cell数组的大小为2的N次方,在初始化时Cell数组中的Cell元素个数为2,数组里面的变量实体是Cell类型。Cell类型是AtomicLong的一个改进,用来减少缓存的争用,也就是解决伪共享问题。
对于大多数孤立的多个原子操作进行字节填充是浪费的,因为原子性操作都是无规律地分散在内存中的(也就是说多个原子性变量的内存地址是不连续的),多个原子变量被放入同一个缓存行的可能性很小。但是原子性数组元素的内存地址是连续的,所以数组内的多个元素能经常共享缓存行,因此这里使用@sun.misc.Contended注解对Cell类进行字节填充,这防止了数组中多个元素共享一个缓存行,在性能上是一个提升。
小Demo
import java.util.concurrent.atomic.LongAdder; /** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2021/11/30 22:52 * @mark: show me the code , change the world */ public class AtomicLongTest { //(10)创建Long型原子计数器 // private static AtomicLong atomicLong = new AtomicLong(); private static LongAdder longAdder = new LongAdder(); //(11)创建数据源 private static Integer[] arrayOne = new Integer[]{0, 1, 2, 3, 0, 5, 6, 0, 56, 0}; private static Integer[] arrayTwo = new Integer[]{10, 1, 2, 3, 0, 5, 6, 0, 56, 0}; public static void main(String[] args) throws InterruptedException { //(12)线程one统计数组arrayOne中0的个数 Thread threadOne = new Thread(() -> { int size = arrayOne.length; for (int i = 0; i < size; ++i) { if (arrayOne[i].intValue() == 0) { longAdder.increment(); } } }); //(13)线程two统计数组arrayTwo中0的个数 Thread threadTwo = new Thread(() -> { int size = arrayTwo.length; for (int i = 0; i < size; ++i) { if (arrayTwo[i].intValue() == 0) { longAdder.increment(); } } }); //(14)启动子线程 threadOne.start(); threadTwo.start(); //(15)等待线程执行完毕 threadOne.join(); threadTwo.join(); System.out.println("count 0:" + longAdder.sum()); } }
源码分析
为了解决高并发下多线程对一个变量CAS争夺失败后进行自旋而造成的降低并发性能问题,LongAdder在内部维护多个Cell元素(一个动态的Cell数组)来分担对单个变量进行争夺的开销。
先来思考几个问题
(1)LongAdder的结构是怎样的?
(2)当前线程应该访问Cell数组里面的哪一个Cell元素?
(3)如何初始化Cell数组?
(4)Cell数组如何扩容?
(5)线程访问分配的Cell元素有冲突后如何处理?
(6)如何保证线程操作被分配的Cell元素的原子性?
LongAdder类继承自Striped64类,在Striped64内部维护着三个变量。
LongAdder的真实值其实是base的值与Cell数组里面所有Cell元素中的value值的累加,base是个基础值,默认为0。
cellsBusy用来实现自旋锁,状态值只有0和1,当创建Cell元素,扩容Cell数组或者初始化Cell数组时,使用CAS操作该变量来保证同时只有一个线程可以进行其中之一的操作。
/** * Padded variant of AtomicLong supporting only raw accesses plus CAS. * * JVM intrinsics note: It would be possible to use a release-only * form of CAS here, if it were provided. */ @sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }
Cell的构造很简单,其内部维护一个被声明为volatile的变量,这里声明为volatile是因为线程操作value变量时没有使用锁,为了保证变量的内存可见性这里将其声明为volatile的。
cas函数通过CAS操作,保证了当前线程更新时被分配的Cell元素中value值的原子性。
Cell类使用@sun.misc.Contended修饰是为了避免伪共享。
到这里我们回答了问题1和问题6。
重要的方法
long sum()
返回当前的值,内部操作是累加所有Cell内部的value值后再累加base。
/** * Returns the current sum. The returned value is <em>NOT</em> an * atomic snapshot; invocation in the absence of concurrent * updates returns an accurate result, but concurrent updates that * occur while the sum is being calculated might not be * incorporated. * * @return the sum */ public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
由于计算总和时没有对Cell数组进行加锁,所以在累加过程中可能有其他线程对Cell中的值进行了修改,也有可能对数组进行了扩容,所以sum返回的值并不是非常精确的,其返回值并不是一个调用sum方法时的原子快照值。
reset
重置操作
/** * Resets variables maintaining the sum to zero. This method may * be a useful alternative to creating a new adder, but is only * effective if there are no concurrent updates. Because this * method is intrinsically racy, it should only be used when it is * known that no threads are concurrently updating. */ public void reset() { Cell[] as = cells; Cell a; base = 0L; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) a.value = 0L; } } }
base置为0,如果Cell数组有元素,则元素值被重置为0
sumThenReset
sum的改造版本
/** * Equivalent in effect to {@link #sum} followed by {@link * #reset}. This method may apply for example during quiescent * points between multithreaded computations. If there are * updates concurrent with this method, the returned value is * <em>not</em> guaranteed to be the final value occurring before * the reset. * * @return the sum */ public long sumThenReset() { Cell[] as = cells; Cell a; long sum = base; base = 0L; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) { sum += a.value; a.value = 0L; } } } return sum; }
在使用sum累加对应的Cell值后,把当前Cell的值重置为0, base重置为0。这样,当多线程调用该方法时会有问题,比如考虑第一个调用线程清空Cell的值,则后一个线程调用时累加的都是0值。
longValue()
等价于sum
/** * Equivalent to {@link #sum}. * * @return the sum */ public long longValue() { return sum(); }
add(long x)
/** * Adds the given value. * * @param x the value to add */ public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { // 1 boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || // 2 (a = as[getProbe() & m]) == null || // 3 !(uncontended = a.cas(v = a.value, v + x))) // 4 longAccumulate(x, null, uncontended); // 5 } } /** * CASes the base field. */ final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); }
代码(1)首先看cells是否为null,如果为null则当前在基础变量base上进行累加,这时候就类似AtomicLong的操作。 如果cells不为null或者线程执行代码(1)的CAS操作失败了,则会去执行代码(2)。
代码(2)(3)决定当前线程应该访问cells数组里面的哪一个Cell元素,如果当前线程映射的元素存在则执行代码(4),使用CAS操作去更新分配的Cell元素的value值,如果当前线程映射的元素不存在或者存在但是CAS操作失败则执行代码(5)。
其实将代码(2)(3)(4)合起来看就是获取当前线程应该访问的cells数组的Cell元素,然后进行CAS更新操作,只是在获取期间如果有些条件不满足则会跳转到代码(5)执行。
另外当前线程应该访问cells数组的哪一个Cell元素是通过getProbe() & m进行计算的,其中m是当前cells数组元素个数-1,getProbe()则用于获取当前线程中变量threadLocalRandomProbe的值,这个值一开始为0,在代码(5)里面会对其进行初始化。并且当前线程通过分配的Cell元素的cas函数来保证对Cell元素value值更新的原子性,到这里我们回答了问题2和问题6。
longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)
cells数组被初始化和扩容的地方
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { //(6) 初始化当前线程的变量threadLocalRandomProbe的值 int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // h = getProbe(); wasUncontended = true; } boolean collide = false; for (; ; ) { Cell[] as; Cell a; int n; long v; if ((as = cells) ! = null && (n = as.length) > 0) {//(7) if ((a = as[(n -1) & h]) == null) {//(8) if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) ! = null && (m = rs.length) > 0 && rs[j = (m -1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (! wasUncontended) // CAS already known to fail wasUncontended = true; //当前Cell存在,则执行CAS设置(9) else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; //当前Cell数组元素个数大于CPU个数(10) else if (n >= NCPU || cells ! = as) collide = false; // At max size or stale //是否有冲突(11) else if (! collide) collide = true; //如果当前元素个数没有达到CPU个数并且有冲突则扩容(12) else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale //12.1 Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { //12.2 cellsBusy = 0; } //12.3 collide = false; continue; // Retry with expanded table } //(13)为了能够找到一个空闲的Cell,重新计算hash值,xorshift算法生成随机数 h = advanceProbe(h); } //初始化Cell数组(14) else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { if (cells == as) { //14.1 Cell[] rs = new Cell[2]; //14.2 rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { //14.3 cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
上面代码比较复杂,这里我们主要关注问题3、问题4和问题5。
当每个线程第一次执行到代码(6)时,会初始化当前线程变量threadLocalRandomProbe的值,上面也说了,这个变量在计算当前线程应该被分配到cells数组的哪一个Cell元素时会用到。
cells数组的初始化是在代码(14)中进行的,其中cellsBusy是一个标示,为0说明当前cells数组没有在被初始化或者扩容,也没有在新建Cell元素,为1则说明cells数组在被初始化或者扩容,或者当前在创建新的Cell元素、通过CAS操作来进行0或1状态的切换,这里使用casCellsBusy函数。假设当前线程通过CAS设置cellsBusy为1,则当前线程开始初始化操作,那么这时候其他线程就不能进行扩容了。
如代码(14.1)初始化cells数组元素个数为2,然后使用h&1计算当前线程应该访问celll数组的哪个位置,也就是使用当前线程的threadLocalRandomProbe变量值&(cells数组元素个数-1),然后标示cells数组已经被初始化,最后代码(14.3)重置了cellsBusy标记。
显然这里没有使用CAS操作,却是线程安全的,原因是cellsBusy是volatile类型的,这保证了变量的内存可见性,另外此时其他地方的代码没有机会修改cellsBusy的值。在这里初始化的cells数组里面的两个元素的值目前还是null。这里回答了问题3,知道了cells数组如何被初始化。
cells数组的扩容是在代码(12)中进行的,对cells扩容是有条件的,也就是代码(10)(11)的条件都不满足的时候。具体就是当前cells的元素个数小于当前机器CPU个数并且当前多个线程访问了cells中同一个元素,从而导致冲突使其中一个线程CAS失败时才会进行扩容操作。
这里为何要涉及CPU个数呢?只有当每个CPU都运行一个线程时才会使多线程的效果最佳,也就是当cells数组元素个数与CPU个数一致时,每个Cell都使用一个CPU进行处理,这时性能才是最佳的。
代码(12)中的扩容操作也是先通过CAS设置cellsBusy为1,然后才能进行扩容。假设CAS成功则执行代码(12.1)将容量扩充为之前的2倍,并复制Cell元素到扩容后数组。
另外,扩容后cells数组里面除了包含复制过来的元素外,还包含其他新元素,这些元素的值目前还是null。这里回答了问题4。
在代码(7)(8)中,当前线程调用add方法并根据当前线程的随机数threadLocalRandomProbe和cells元素个数计算要访问的Cell元素下标,然后如果发现对应下标元素的值为null,则新增一个Cell元素到cells数组,并且在将其添加到cells数组之前要竞争设置cellsBusy为1。
代码(13)对CAS失败的线程重新计算当前线程的随机值threadLocalRandomProbe,以减少下次访问cells元素时的冲突机会。这里回答了问题5。
LongAdder 小结
JDK 8中新增的LongAdder原子性操作类,该类通过内部cells数组分担了高并发下多线程同时对一个原子变量进行更新时的竞争量,让多个线程可以同时对cells数组里面的元素进行并行的更新操作。
另外,数组元素Cell使用@sun.misc.Contended注解进行修饰,这避免了cells数组内多个原子变量被放入同一个缓存行,也就是避免了伪共享,这对性能也是一个提升。
LongAccumulator
概述
LongAdder类是LongAccumulator的一个特例, LongAccumulator比LongAdder的功能更强大。
例如下面的构造函数,其中accumulatorFunction是一个双目运算器接口,其根据输入的两个参数返回一个计算值,identity则是LongAccumulator累加器的初始值。
/** * Creates a new instance using the given accumulator function * and identity element. * @param accumulatorFunction a side-effect-free function of two arguments * @param identity identity (initial value) for the accumulator function */ public LongAccumulator(LongBinaryOperator accumulatorFunction, long identity) { this.function = accumulatorFunction; base = this.identity = identity; }
看看 LongBinaryOperator
LongAccumulator相比于LongAdder,可以为累加器提供非0的初始值,后者只能提供默认的0值。另外,前者还可以指定累加规则,比如不进行累加而进行相乘,只需要在构造LongAccumulator时传入自定义的双目运算器即可,后者则内置累加的规则。
LongAdder#add vs LongAccumulator#accumulate
LongAdder#add方法
LongAccumulator#accumulate方法
LongAccumulator相比于LongAdder的不同在于,在调用casBase时后者传递的是b+x,前者则使用了r = function.applyAsLong(b = base, x)来计算。
另外,前者在调用longAccumulate时传递的是function,而后者是null。从下面的代码可知,当fn为null时就使用v+x加法运算,这时候就等价于LongAdder,当fn不为null时则使用传递的fn函数计算。
LongAdder类是LongAccumulator的一个特例,只是后者提供了更加强大的功能,可以让用户自定义累加规则。
小Demo
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.LongAccumulator; import java.util.function.LongBinaryOperator; import java.util.stream.IntStream; /** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2021/12/1 0:02 * @mark: show me the code , change the world */ public class LongAccumulator1 { public static void main(String[] args) { testAccumulate(); } private static void testAccumulate() { LongBinaryOperator op = (x, y) -> 2 * x + y; LongAccumulator accumulator = new LongAccumulator(op, 1L); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 100) .forEach(i -> executor.submit(() -> accumulator.accumulate(i))); System.out.format("Add: %d\n", accumulator.getThenReset()); executor.shutdown(); } }
原子类总结
并发包中的原子性操作类都是使用非阻塞算法CAS实现的,这相比使用锁实现原子性操作在性能上有很大提高。
梳理了AtomicLong类的实现原理,然后JDK 8中新增的LongAdder类和LongAccumulator类的原理。