Java8原子弹类之LongAdder源码分析

简介: JDK 8开始,针对Long型的原子操作, Java又提供了LongAdder. LongAccumulator; 针对Double类型,Java提供了DoubleAdder、DoubleAccumulator。

Java8原子弹类之LongAdder源码分析

JDK 8开始,针对Long型的原子操作, Java又提供了LongAdder. LongAccumulator; 针对Double类型,Java提供了DoubleAdder、DoubleAccumulator。


Striped64 UML


5.png


AtomicLong内部是一个volatile long型变量,由多个线程对这个变量进行CAS操作。多个线程同时对一个变量进行CAS操作,在高并发场景下仍不够快,若再提高性能,咋办?

把一个变量拆成多份,变为多个变量,类似ConcurrentHashMap分段锁。如下图,把一个Long型拆成一个base变量外加多个Cell,每个Cell包装一个Long型变量。当多个线程并发累加的时:


如果并发度低,就直接加到base变量上

并发度高,冲突大,平摊到这些Cell上

最后取值时,再把base和这些Cell求sum运算。

4.png



核心 API

add

3.png


public void add(long x) {

   Cell[] as; long b, v; int m; Cell a;

   // 判断cells是否还没被初始化,并且尝试对value值进行cas操作

   if ((as = cells) != null || !casBase(b = base, b + x)) {

       // 如果cells已经初始化 或 cas操作失败,则运行if内部语句

       boolean uncontended = true;

       // cell[]数组是否初始化

       // cell[]数组虽然初始化了但是数组长度是否为0

      // 该线程所对应cell是否为null

      // 尝试对该线程对应的cell单元进行cas更新是否失败,如果这些条件有一条为true,则运行最为核心的方法longAccumulate

       if (as == null || (m = as.length - 1) < 0 ||

           (a = as[getProbe() & m]) == null ||

           !(uncontended = a.cas(v = a.value, v + x)))

           longAccumulate(x, null, uncontended);

   }

}


包含一个Cell数组,Striped64的一个内部类。即AtomicLong的填充变体且只支持原始访问和CAS,有一个value变量,并且提供cas方法更新value值。


/**

 * 处理涉及初始化,调整大小,创建新Cell,和/或争用的更新案例

 *

 * @param x 值

 * @param fn 更新方法

 * @param wasUncontended 调用

 */

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {

    int h;

    // 获取线程probe的值

    if ((h = getProbe()) == 0) {

        // 值为0,则强制初始化

        ThreadLocalRandom.current();

        h = getProbe();

        wasUncontended = true;

    }

    boolean collide = false; // True if last slot nonempt

    for (;;) {

        Cell[] as; Cell a; int n; long v;

        // 这个if分支处理上述四个条件中的前两个

        // 此时cells数组已初始化 && 长度大于0

        if ((as = cells) != null && (n = as.length) > 0) {

            // 线程对应cell为null

            if ((a = as[(n - 1) & h]) == null) {

                // 若busy锁未被占有

                if (cellsBusy == 0) {       // Try to attach new Cell

                    // 新建一个cell

                    Cell r = new Cell(x);   // Optimistically create

                    // 检测busy是否为0,并且尝试锁busy

                    if (cellsBusy == 0 && casCellsBusy()) {

                        boolean created = false;

                        try { // Recheck under lock

                            Cell[] rs; int m, j;

                            // 再次确认线程probe所对应的cell为null,将新建的cell赋值

                            if ((rs = cells) != null &&

                                (m = rs.length) > 0 &&

                                rs[j = (m - 1) & h] == null) {

                                rs[j] = r;

                                created = true;

                            }

                        } finally {

                            // 解锁

                            cellsBusy = 0;

                        }

                        if (created)

                            break;

                        //如果失败,再次尝试

                        continue;           // Slot is now non-empty

                    }

                }

                collide = false;

            }

            //置为true后交给循环重试

            else if (!wasUncontended)       // CAS already known to fail

                wasUncontended = true;      // Continue after rehash

            //尝试给线程对应的cell update

            else if (a.cas(v = a.value, ((fn == null) ? v + x :

                                         fn.applyAsLong(v, x))))

                break;

            else if (n >= NCPU || cells != as)

                collide = false;            // At max size or stale

            else if (!collide)

                collide = true;

            //在以上条件都无法解决的情况下尝试扩展cell

            else if (cellsBusy == 0 && casCellsBusy()) {

                try {

                    if (cells == as) {      // Expand table unless stale

                        Cell[] rs = new Cell[n << 1];

                        for (int i = 0; i < n; ++i)

                            rs[i] = as[i];

                        cells = rs;

                    }

                } finally {

                    cellsBusy = 0;

                }

                collide = false;

                continue;                   // Retry with expanded table

            }

            h = advanceProbe(h);

        }

        //此时cells还未进行第一次初始化,进行初始化

        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {

            boolean init = false;

            try {                           // Initialize table

                if (cells == as) {

                    Cell[] rs = new Cell[2];

                    rs[h & 1] = new Cell(x);

                    cells = rs;

                    init = true;

                }

            } finally {

                cellsBusy = 0;

            }

            if (init)

                break;

        }

        //busy锁不成功或者忙,则再重试一次casBase对value直接累加

        else if (casBase(v = base, ((fn == null) ? v + x :

                                    fn.applyAsLong(v, x))))

            break;                          // Fall back on using base

    }

}

 /**

  * Spinlock (locked via CAS) used when resizing and/or creating Cells.

  * 通过cas实现的自旋锁,用于扩大或者初始化cells

  */

 transient volatile int cellsBusy;



从以上分析来看,longAccumulate就是为了尽量减少多个线程更新同一个value,实在不行则扩大cell


SUM(最终一致性问题)

并没有对 cells 数组加锁,所以是最终一致性,而非强一致性。类似 concurrenthashmap#clear(一边执行清空操作,一边还有线程放入数据,clear调用完完毕后再读取)。因此,适合高并发的统计场景,而不适合要对某个Long型变量进行严格同步的场景。


/**

* Returns the current sum.  返回值不是个原子快照;无并发修改的调用可以返回精确值,但当计算sum时有并发修改,就可能无法正常协作了。

*/

public long sum() {

   Cell[] as = cells; Cell a;

   long sum = base;

   if (as != null) {

       for (int i = 0; i < as.length; ++i) {

           if ((a = as[i]) != null)

               sum += a.value;

       }

   }

   return sum;

}


LongAdder减少冲突的方法以及在求和场景下比AtomicLong更高效。

因为LongAdder在更新数值时并非对一个数进行更新,而是分散到多个cell,这样在多线程的情况下可以有效的嫌少冲突和压力,使得更加高效。


由于无论是long or double,都是64位。但因没有double型的CAS操作,所以是通过把double型转化

成long型来实现。所以,上面的base和Cell[]变量,位于基类Striped64。英文Striped意为“条带”, 即分片。


使用场景

适用于统计求和计数的场景,因为它提供add、sum方法。


LongAdder是否能够替换AtomicLong

不行,因为AtomicLong提供了很多cas方法,例如getAndIncrement、getAndDecrement等,使用起来非常灵活,而LongAdder只有add和sum,使用较受限。


优点:由于 JVM 会将 64位的double,long 型变量的读操作分为两次32位的读操作,所以低并发保持了 AtomicLong性能,高并发下热点数据被 hash 到多个 Cell,有限分离,通过分散提升了并行度

但统计时有数据更新,也可能会出现数据误差,但高并发场景有限使用此类,低时还是可以继续 AtomicLong


伪共享与缓存行填充

Cell类定义用了注解 Contended,JDK 8新增,涉及伪共享与缓存行填充。


每个CPU都有自己的缓存。缓存与主内存进行数据交换的基本单位叫Cache Line (缓存行)。在64位x86架

构中,缓存行是64字节,也就是8个Long型的大小。这也意味着当缓存失效,要刷新到主内存的时候,最少要刷新64字节。


如下,主内存中有变量X、Y、 Z (假设每个变量都是一个Long型) ,被CPU1和CPU2分别读入自己的缓

存,放在了同一行Cache Line里面。当CPU1修改了X变量,它要失效整行Cache Line,即往总线上发消息,通知CPU2对应Cache Line也失效。由于Cache Line是数据交换的基本单位,无法只失效X,要失效就会失效整行的Cache Line,这会导致Y、Z变量的缓存也失效。


2.png


使用Contended注解,即可实现缓存行填充。不让Cell数组中相邻的元素落到同一个缓存行。


文章知识点与官方知识档案匹配,可进一步学习相关知识


目录
相关文章
|
1天前
|
算法 Java 数据处理
从HashSet到TreeSet,Java集合框架中的Set接口及其实现类以其“不重复性”要求,彻底改变了处理唯一性数据的方式。
从HashSet到TreeSet,Java集合框架中的Set接口及其实现类以其“不重复性”要求,彻底改变了处理唯一性数据的方式。HashSet基于哈希表实现,提供高效的元素操作;TreeSet则通过红黑树实现元素的自然排序,适合需要有序访问的场景。本文通过示例代码详细介绍了两者的特性和应用场景。
14 6
|
3天前
|
IDE Java 编译器
Java:如何确定编译和运行时类路径是否一致
类路径(Classpath)是JVM用于查找类文件的路径列表,对编译和运行Java程序至关重要。编译时通过`javac -classpath`指定,运行时通过`java -classpath`指定。IDE如Eclipse和IntelliJ IDEA也提供界面管理类路径。确保编译和运行时类路径一致,特别是外部库和项目内部类的路径设置。
|
2天前
|
安全 Java 测试技术
Java零基础-StringBuffer 类详解
【10月更文挑战第9天】Java零基础教学篇,手把手实践教学!
10 2
|
3天前
|
算法 Java 数据处理
从HashSet到TreeSet,Java集合框架中的Set接口及其实现类以其独特的“不重复性”要求,彻底改变了处理唯一性约束数据的方式。
【10月更文挑战第14天】从HashSet到TreeSet,Java集合框架中的Set接口及其实现类以其独特的“不重复性”要求,彻底改变了处理唯一性约束数据的方式。本文深入探讨Set的核心理念,并通过示例代码展示了HashSet和TreeSet的特点和应用场景。
9 2
|
3天前
|
存储 Java 索引
Java 中集合框架的常见接口和类
【10月更文挑战第13天】这些只是集合框架中的一部分常见接口和类,还有其他一些如 Queue、Deque 等接口以及相关的实现类。理解和掌握这些集合的特点和用法对于高效编程非常重要。
|
8天前
|
小程序 Oracle Java
JVM知识体系学习一:JVM了解基础、java编译后class文件的类结构详解,class分析工具 javap 和 jclasslib 的使用
这篇文章是关于JVM基础知识的介绍,包括JVM的跨平台和跨语言特性、Class文件格式的详细解析,以及如何使用javap和jclasslib工具来分析Class文件。
21 0
JVM知识体系学习一:JVM了解基础、java编译后class文件的类结构详解,class分析工具 javap 和 jclasslib 的使用
|
9天前
|
Java
Java基础之 JDK8 HashMap 源码分析(中间写出与JDK7的区别)
这篇文章详细分析了Java中HashMap的源码,包括JDK8与JDK7的区别、构造函数、put和get方法的实现,以及位运算法的应用,并讨论了JDK8中的优化,如链表转红黑树的阈值和扩容机制。
13 1
|
4天前
|
安全 Java UED
Java中的多线程编程:从基础到实践
本文深入探讨了Java中的多线程编程,包括线程的创建、生命周期管理以及同步机制。通过实例展示了如何使用Thread类和Runnable接口来创建线程,讨论了线程安全问题及解决策略,如使用synchronized关键字和ReentrantLock类。文章还涵盖了线程间通信的方式,包括wait()、notify()和notifyAll()方法,以及如何避免死锁。此外,还介绍了高级并发工具如CountDownLatch和CyclicBarrier的使用方法。通过综合运用这些技术,可以有效提高多线程程序的性能和可靠性。
|
4天前
|
缓存 Java UED
Java中的多线程编程:从基础到实践
【10月更文挑战第13天】 Java作为一门跨平台的编程语言,其强大的多线程能力一直是其核心优势之一。本文将从最基础的概念讲起,逐步深入探讨Java多线程的实现方式及其应用场景,通过实例讲解帮助读者更好地理解和应用这一技术。
22 3
|
8天前
|
Java 调度 UED
深入理解Java中的多线程与并发机制
本文将详细探讨Java中多线程的概念、实现方式及并发机制,包括线程的生命周期、同步与锁机制以及高级并发工具。通过实例代码演示,帮助读者理解如何在Java中有效地处理多线程和并发问题,提高程序的性能和响应能力。