Java8原子弹类之LongAdder源码分析

简介: JDK 8开始,针对Long型的原子操作, Java又提供了LongAdder. LongAccumulator; 针对Double类型,Java提供了DoubleAdder、DoubleAccumulator。

Java8原子弹类之LongAdder源码分析

JDK 8开始,针对Long型的原子操作, Java又提供了LongAdder. LongAccumulator; 针对Double类型,Java提供了DoubleAdder、DoubleAccumulator。


Striped64 UML


5.png


AtomicLong内部是一个volatile long型变量,由多个线程对这个变量进行CAS操作。多个线程同时对一个变量进行CAS操作,在高并发场景下仍不够快,若再提高性能,咋办?

把一个变量拆成多份,变为多个变量,类似ConcurrentHashMap分段锁。如下图,把一个Long型拆成一个base变量外加多个Cell,每个Cell包装一个Long型变量。当多个线程并发累加的时:


如果并发度低,就直接加到base变量上

并发度高,冲突大,平摊到这些Cell上

最后取值时,再把base和这些Cell求sum运算。

4.png



核心 API

add

3.png


public void add(long x) {

   Cell[] as; long b, v; int m; Cell a;

   // 判断cells是否还没被初始化,并且尝试对value值进行cas操作

   if ((as = cells) != null || !casBase(b = base, b + x)) {

       // 如果cells已经初始化 或 cas操作失败,则运行if内部语句

       boolean uncontended = true;

       // cell[]数组是否初始化

       // cell[]数组虽然初始化了但是数组长度是否为0

      // 该线程所对应cell是否为null

      // 尝试对该线程对应的cell单元进行cas更新是否失败,如果这些条件有一条为true,则运行最为核心的方法longAccumulate

       if (as == null || (m = as.length - 1) < 0 ||

           (a = as[getProbe() & m]) == null ||

           !(uncontended = a.cas(v = a.value, v + x)))

           longAccumulate(x, null, uncontended);

   }

}


包含一个Cell数组,Striped64的一个内部类。即AtomicLong的填充变体且只支持原始访问和CAS,有一个value变量,并且提供cas方法更新value值。


/**

 * 处理涉及初始化,调整大小,创建新Cell,和/或争用的更新案例

 *

 * @param x 值

 * @param fn 更新方法

 * @param wasUncontended 调用

 */

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {

    int h;

    // 获取线程probe的值

    if ((h = getProbe()) == 0) {

        // 值为0,则强制初始化

        ThreadLocalRandom.current();

        h = getProbe();

        wasUncontended = true;

    }

    boolean collide = false; // True if last slot nonempt

    for (;;) {

        Cell[] as; Cell a; int n; long v;

        // 这个if分支处理上述四个条件中的前两个

        // 此时cells数组已初始化 && 长度大于0

        if ((as = cells) != null && (n = as.length) > 0) {

            // 线程对应cell为null

            if ((a = as[(n - 1) & h]) == null) {

                // 若busy锁未被占有

                if (cellsBusy == 0) {       // Try to attach new Cell

                    // 新建一个cell

                    Cell r = new Cell(x);   // Optimistically create

                    // 检测busy是否为0,并且尝试锁busy

                    if (cellsBusy == 0 && casCellsBusy()) {

                        boolean created = false;

                        try { // Recheck under lock

                            Cell[] rs; int m, j;

                            // 再次确认线程probe所对应的cell为null,将新建的cell赋值

                            if ((rs = cells) != null &&

                                (m = rs.length) > 0 &&

                                rs[j = (m - 1) & h] == null) {

                                rs[j] = r;

                                created = true;

                            }

                        } finally {

                            // 解锁

                            cellsBusy = 0;

                        }

                        if (created)

                            break;

                        //如果失败,再次尝试

                        continue;           // Slot is now non-empty

                    }

                }

                collide = false;

            }

            //置为true后交给循环重试

            else if (!wasUncontended)       // CAS already known to fail

                wasUncontended = true;      // Continue after rehash

            //尝试给线程对应的cell update

            else if (a.cas(v = a.value, ((fn == null) ? v + x :

                                         fn.applyAsLong(v, x))))

                break;

            else if (n >= NCPU || cells != as)

                collide = false;            // At max size or stale

            else if (!collide)

                collide = true;

            //在以上条件都无法解决的情况下尝试扩展cell

            else if (cellsBusy == 0 && casCellsBusy()) {

                try {

                    if (cells == as) {      // Expand table unless stale

                        Cell[] rs = new Cell[n << 1];

                        for (int i = 0; i < n; ++i)

                            rs[i] = as[i];

                        cells = rs;

                    }

                } finally {

                    cellsBusy = 0;

                }

                collide = false;

                continue;                   // Retry with expanded table

            }

            h = advanceProbe(h);

        }

        //此时cells还未进行第一次初始化,进行初始化

        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {

            boolean init = false;

            try {                           // Initialize table

                if (cells == as) {

                    Cell[] rs = new Cell[2];

                    rs[h & 1] = new Cell(x);

                    cells = rs;

                    init = true;

                }

            } finally {

                cellsBusy = 0;

            }

            if (init)

                break;

        }

        //busy锁不成功或者忙,则再重试一次casBase对value直接累加

        else if (casBase(v = base, ((fn == null) ? v + x :

                                    fn.applyAsLong(v, x))))

            break;                          // Fall back on using base

    }

}

 /**

  * Spinlock (locked via CAS) used when resizing and/or creating Cells.

  * 通过cas实现的自旋锁,用于扩大或者初始化cells

  */

 transient volatile int cellsBusy;



从以上分析来看,longAccumulate就是为了尽量减少多个线程更新同一个value,实在不行则扩大cell


SUM(最终一致性问题)

并没有对 cells 数组加锁,所以是最终一致性,而非强一致性。类似 concurrenthashmap#clear(一边执行清空操作,一边还有线程放入数据,clear调用完完毕后再读取)。因此,适合高并发的统计场景,而不适合要对某个Long型变量进行严格同步的场景。


/**

* Returns the current sum.  返回值不是个原子快照;无并发修改的调用可以返回精确值,但当计算sum时有并发修改,就可能无法正常协作了。

*/

public long sum() {

   Cell[] as = cells; Cell a;

   long sum = base;

   if (as != null) {

       for (int i = 0; i < as.length; ++i) {

           if ((a = as[i]) != null)

               sum += a.value;

       }

   }

   return sum;

}


LongAdder减少冲突的方法以及在求和场景下比AtomicLong更高效。

因为LongAdder在更新数值时并非对一个数进行更新,而是分散到多个cell,这样在多线程的情况下可以有效的嫌少冲突和压力,使得更加高效。


由于无论是long or double,都是64位。但因没有double型的CAS操作,所以是通过把double型转化

成long型来实现。所以,上面的base和Cell[]变量,位于基类Striped64。英文Striped意为“条带”, 即分片。


使用场景

适用于统计求和计数的场景,因为它提供add、sum方法。


LongAdder是否能够替换AtomicLong

不行,因为AtomicLong提供了很多cas方法,例如getAndIncrement、getAndDecrement等,使用起来非常灵活,而LongAdder只有add和sum,使用较受限。


优点:由于 JVM 会将 64位的double,long 型变量的读操作分为两次32位的读操作,所以低并发保持了 AtomicLong性能,高并发下热点数据被 hash 到多个 Cell,有限分离,通过分散提升了并行度

但统计时有数据更新,也可能会出现数据误差,但高并发场景有限使用此类,低时还是可以继续 AtomicLong


伪共享与缓存行填充

Cell类定义用了注解 Contended,JDK 8新增,涉及伪共享与缓存行填充。


每个CPU都有自己的缓存。缓存与主内存进行数据交换的基本单位叫Cache Line (缓存行)。在64位x86架

构中,缓存行是64字节,也就是8个Long型的大小。这也意味着当缓存失效,要刷新到主内存的时候,最少要刷新64字节。


如下,主内存中有变量X、Y、 Z (假设每个变量都是一个Long型) ,被CPU1和CPU2分别读入自己的缓

存,放在了同一行Cache Line里面。当CPU1修改了X变量,它要失效整行Cache Line,即往总线上发消息,通知CPU2对应Cache Line也失效。由于Cache Line是数据交换的基本单位,无法只失效X,要失效就会失效整行的Cache Line,这会导致Y、Z变量的缓存也失效。


2.png


使用Contended注解,即可实现缓存行填充。不让Cell数组中相邻的元素落到同一个缓存行。


文章知识点与官方知识档案匹配,可进一步学习相关知识


目录
相关文章
|
23天前
|
存储 缓存 安全
java 中操作字符串都有哪些类,它们之间有什么区别
Java中操作字符串的类主要有String、StringBuilder和StringBuffer。String是不可变的,每次操作都会生成新对象;StringBuilder和StringBuffer都是可变的,但StringBuilder是非线程安全的,而StringBuffer是线程安全的,因此性能略低。
42 8
|
23天前
|
Java 开发者
在 Java 中,一个类可以实现多个接口吗?
这是 Java 面向对象编程的一个重要特性,它提供了极大的灵活性和扩展性。
46 1
|
1月前
|
存储 安全 Java
java.util的Collections类
Collections 类位于 java.util 包下,提供了许多有用的对象和方法,来简化java中集合的创建、处理和多线程管理。掌握此类将非常有助于提升开发效率和维护代码的简洁性,同时对于程序的稳定性和安全性有大有帮助。
60 17
|
1月前
|
安全 Java
Java多线程集合类
本文介绍了Java中线程安全的问题及解决方案。通过示例代码展示了使用`CopyOnWriteArrayList`、`CopyOnWriteArraySet`和`ConcurrentHashMap`来解决多线程环境下集合操作的线程安全问题。这些类通过不同的机制确保了线程安全,提高了并发性能。
|
1月前
|
存储 Java 程序员
Java基础的灵魂——Object类方法详解(社招面试不踩坑)
本文介绍了Java中`Object`类的几个重要方法,包括`toString`、`equals`、`hashCode`、`finalize`、`clone`、`getClass`、`notify`和`wait`。这些方法是面试中的常考点,掌握它们有助于理解Java对象的行为和实现多线程编程。作者通过具体示例和应用场景,详细解析了每个方法的作用和重写技巧,帮助读者更好地应对面试和技术开发。
106 4
|
1月前
|
Java 编译器 开发者
Java异常处理的最佳实践,涵盖理解异常类体系、选择合适的异常类型、提供详细异常信息、合理使用try-catch和finally语句、使用try-with-resources、记录异常信息等方面
本文探讨了Java异常处理的最佳实践,涵盖理解异常类体系、选择合适的异常类型、提供详细异常信息、合理使用try-catch和finally语句、使用try-with-resources、记录异常信息等方面,帮助开发者提高代码质量和程序的健壮性。
53 2
|
29天前
|
Java Android开发
Eclipse 创建 Java 类
Eclipse 创建 Java 类
26 0
|
4月前
|
Java 开发者
奇迹时刻!探索 Java 多线程的奇幻之旅:Thread 类和 Runnable 接口的惊人对决
【8月更文挑战第13天】Java的多线程特性能显著提升程序性能与响应性。本文通过示例代码详细解析了两种核心实现方式:Thread类与Runnable接口。Thread类适用于简单场景,直接定义线程行为;Runnable接口则更适合复杂的项目结构,尤其在需要继承其他类时,能保持代码的清晰与模块化。理解两者差异有助于开发者在实际应用中做出合理选择,构建高效稳定的多线程程序。
62 7
|
2月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
26 3
|
2月前
|
Java
Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口
【10月更文挑战第20天】《JAVA多线程深度解析:线程的创建之路》介绍了Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口。文章详细讲解了每种方式的实现方法、优缺点及适用场景,帮助读者更好地理解和掌握多线程编程技术,为复杂任务的高效处理奠定基础。
35 2