多线程进阶学习08------CAS与原子类详解(2)

简介: 多线程进阶学习08------CAS与原子类详解(2)

JUC 包中的原子类分5 大类

基本原子类

使用原子的方式更新基本类型

  • AtomicInteger:整形原子类
  • AtomicLong:长整型原子类
  • AtomicBoolean:布尔型原子类
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
class MyNumber {
    AtomicInteger atomicInteger = new AtomicInteger();
    public void addPlusPlus() {
        atomicInteger.getAndIncrement();
    }
}
/**
 * @auther zzyy
 * @create 2022-02-25 21:59
 */
public class AtomicIntegerDemo {
    public static final int SIZE = 50;
    public static void main(String[] args) throws InterruptedException {
        MyNumber myNumber = new MyNumber();
        CountDownLatch countDownLatch = new CountDownLatch(SIZE);
        for (int i = 1; i <= SIZE; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 1000; j++) {
                        myNumber.addPlusPlus();
                    }
                } finally {
                    countDownLatch.countDown();
                }
            }, String.valueOf(i)).start();
        }
        //等待上面50个线程全部计算完成后,再去获得最终值
        //暂停几秒钟线程
        //try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + "\t" + "result: " + myNumber.atomicInteger.get());
    }
}

引用原子类

  • AtomicReference:引用类型原子类
  • AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
  • AtomicMarkableReference :原子更新带有标记位的引用类型

上面的普通原子类仅仅只能保证一个共享变量的原子操作,这个问题你可能已经知道怎么解决了。有两种解决方案:

1、使用JDK 1.5开始就提供的AtomicReference类保证对象之间的原子性,把多个变量放到一个对象里面进行CAS操作;

2、使用锁。锁内的临界区代码可以保证只有当前线程能操作。

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import java.util.concurrent.atomic.AtomicReference;
@Getter
@ToString
@AllArgsConstructor
class User {
    String userName;
    int age;
}
/**
 * @auther zzyy
 * @create 2022-02-24 14:50
 */
public class AtomicReferenceDemo {
    public static void main(String[] args) {
        AtomicReference<User> atomicReference = new AtomicReference<>();
        User z3 = new User("z3", 22);
        User li4 = new User("li4", 28);
        atomicReference.set(z3);
        System.out.println(atomicReference.compareAndSet(z3, li4) + "\t" + atomicReference.get().toString());
        System.out.println(atomicReference.compareAndSet(z3, li4) + "\t" + atomicReference.get().toString());
    }
}

传入线程Thread泛型,手写自旋锁:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
 * 题目:实现一个自旋锁,复习CAS思想
 * 自旋锁好处:循环比较获取没有类似wait的阻塞。
 *
 * 通过CAS操作完成自旋锁,A线程先进来调用myLock方法自己持有锁5秒钟,B随后进来后发现
 * 当前有线程持有锁,所以只能通过自旋等待,直到A释放锁后B随后抢到。
 */
public class SpinLockDemo
{
    AtomicReference<Thread> atomicReference = new AtomicReference<>();
    public void lock()
    {
        Thread thread = Thread.currentThread();
        System.out.println(Thread.currentThread().getName()+"\t"+"----come in");
        while (!atomicReference.compareAndSet(null, thread)) {
        }
    }
    public void unLock()
    {
        Thread thread = Thread.currentThread();
        atomicReference.compareAndSet(thread,null);
        System.out.println(Thread.currentThread().getName()+"\t"+"----task over,unLock...");
    }
    public static void main(String[] args)
    {
        SpinLockDemo spinLockDemo = new SpinLockDemo();
        new Thread(() -> {
            spinLockDemo.lock();
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
            spinLockDemo.unLock();
        },"A").start();
        //暂停500毫秒,线程A先于B启动
        try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
        new Thread(() -> {
            spinLockDemo.lock();
            spinLockDemo.unLock();
        },"B").start();
    }
}

数组原子类

使用原子的方式更新数组里的某个元素

  • AtomicIntegerArray:整形数组原子类
  • AtomicLongArray:长整形数组原子类
  • AtomicReferenceArray:引用类型数组原子类
import java.util.concurrent.atomic.AtomicIntegerArray;
public class AtomicIntegerArrayDemo
{
    public static void main(String[] args)
    {
        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]);
        //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5);
        //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[]{1,2,3,4,5});
        for (int i = 0; i <atomicIntegerArray.length(); i++) {
            System.out.println(atomicIntegerArray.get(i));
        }
        System.out.println();
        int tmpInt = 0;
        tmpInt = atomicIntegerArray.getAndSet(0,1122);
        System.out.println(tmpInt+"\t"+atomicIntegerArray.get(0));
        tmpInt = atomicIntegerArray.getAndIncrement(0);
        System.out.println(tmpInt+"\t"+atomicIntegerArray.get(0));
    }
}

对象的属性修改原子类

  • AtomicIntegerFieldUpdater:原子更新整形字段的更新器
  • AtomicLongFieldUpdater:原子更新长整形字段的更新器
  • AtomicReferenceFieldUpdater:原子更新引用类型字段的更新器

①. 使用目的:

以一种线程安全的方式操作非线程安全对象内的某些字段

是否可以不要锁定整个对象,减少锁定的范围,只关注长期、敏感性变化的某一个字段,而不是整个对象,以达到精确加锁+节约内存的目的

②. 使用要求

更新的对象属性必须使用public volatile修饰符

这种原子类型,是抽象类,所以每次使用都必须使用静态方法newUpdater( )创建一个更新器,并且需要设置想要更新的类和属性

AtomicIntegerFieldUpdater

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
class BankAccount//资源类
{
    String bankName = "CCB";
    //更新的对象属性必须使用 public volatile 修饰符。
    public volatile int money = 0;//钱数
    public void add()
    {
        money++;
    }
    //因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须
    // 使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
    AtomicIntegerFieldUpdater<BankAccount> fieldUpdater =
            AtomicIntegerFieldUpdater.newUpdater(BankAccount.class,"money");
    //不加synchronized,保证高性能原子性,局部微创小手术
    public void transMoney(BankAccount bankAccount)
    {
        fieldUpdater.getAndIncrement(bankAccount);
    }
}
/**
 * @auther zzyy
 * 以一种线程安全的方式操作非线程安全对象的某些字段。
 *
 * 需求:
 * 10个线程,
 * 每个线程转账1000,
 * 不使用synchronized,尝试使用AtomicIntegerFieldUpdater来实现。
 */
public class AtomicIntegerFieldUpdaterDemo
{
    public static void main(String[] args) throws InterruptedException
    {
        BankAccount bankAccount = new BankAccount();
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i = 1; i <=10; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <=1000; j++) {
                        //bankAccount.add();
                        bankAccount.transMoney(bankAccount);
                    }
                } finally {
                    countDownLatch.countDown();
                }
            },String.valueOf(i)).start();
        }
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName()+"\t"+"result: "+bankAccount.money);
    }
}

AtomicReferenceFieldUpdater

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
class MyVar //资源类
{
    public volatile Boolean isInit = Boolean.FALSE;
    AtomicReferenceFieldUpdater<MyVar,Boolean> referenceFieldUpdater =
            AtomicReferenceFieldUpdater.newUpdater(MyVar.class,Boolean.class,"isInit");
    public void init(MyVar myVar)
    {
        if (referenceFieldUpdater.compareAndSet(myVar,Boolean.FALSE,Boolean.TRUE))
        {
            System.out.println(Thread.currentThread().getName()+"\t"+"----- start init,need 2 seconds");
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println(Thread.currentThread().getName()+"\t"+"----- over init");
        }else{
            System.out.println(Thread.currentThread().getName()+"\t"+"----- 已经有线程在进行初始化工作。。。。。");
        }
    }
}
/**
 * @auther zzyy
 * 需求:
 * 多线程并发调用一个类的初始化方法,如果未被初始化过,将执行初始化工作,
 * 要求只能被初始化一次,只有一个线程操作成功
 */
public class AtomicReferenceFieldUpdaterDemo
{
    public static void main(String[] args)
    {
        MyVar myVar = new MyVar();
        for (int i = 1; i <=5; i++) {
            new Thread(() -> {
                myVar.init(myVar);
            },String.valueOf(i)).start();
        }
    }
}
//打印:
D:\App\java1.8\jdk\bin\java.exe "-javaagent:D:\App\IntelliJ IDEA 2020.3.1\lib\idea_rt.jar=57957:D:\App\IntelliJ IDEA 2020.3.1\bin" -Dfile.encoding=GBK -classpath D:\App\java1.8\jdk\jre\lib\charsets.jar;D:\App\java1.8\jdk\jre\lib\deploy.jar;D:\App\java1.8\jdk\jre\lib\ext\access-bridge-64.jar;D:\App\java1.8\jdk\jre\lib\ext\cldrdata.jar;D:\App\java1.8\jdk\jre\lib\ext\dnsns.jar;D:\App\java1.8\jdk\jre\lib\ext\jaccess.jar;D:\App\java1.8\jdk\jre\lib\ext\jfxrt.jar;D:\App\java1.8\jdk\jre\lib\ext\localedata.jar;D:\App\java1.8\jdk\jre\lib\ext\nashorn.jar;D:\App\java1.8\jdk\jre\lib\ext\sunec.jar;D:\App\java1.8\jdk\jre\lib\ext\sunjce_provider.jar;D:\App\java1.8\jdk\jre\lib\ext\sunmscapi.jar;D:\App\java1.8\jdk\jre\lib\ext\sunpkcs11.jar;D:\App\java1.8\jdk\jre\lib\ext\zipfs.jar;D:\App\java1.8\jdk\jre\lib\javaws.jar;D:\App\java1.8\jdk\jre\lib\jce.jar;D:\App\java1.8\jdk\jre\lib\jfr.jar;D:\App\java1.8\jdk\jre\lib\jfxswt.jar;D:\App\java1.8\jdk\jre\lib\jsse.jar;D:\App\java1.8\jdk\jre\lib\management-agent.jar;D:\App\java1.8\jdk\jre\lib\plugin.jar;D:\App\java1.8\jdk\jre\lib\resources.jar;D:\App\java1.8\jdk\jre\lib\rt.jar;D:\devspace\source-bulldozer\JavaConcurrent\target\classes;C:\Users\liuya\.m2\repository\org\springframework\boot\spring-boot-starter-web\2.5.8\spring-boot-starter-web-2.5.8.jar;C:\Users\liuya\.m2\repository\org\springframework\boot\spring-boot-starter\2.5.8\spring-boot-starter-2.5.8.jar;C:\Users\liuya\.m2\repository\org\springframework\boot\spring-boot\2.5.8\spring-boot-2.5.8.jar;C:\Users\liuya\.m2\repository\org\springframework\boot\spring-boot-autoconfigure\2.5.8\spring-boot-autoconfigure-2.5.8.jar;C:\Users\liuya\.m2\repository\org\springframework\boot\spring-boot-starter-logging\2.5.8\spring-boot-starter-logging-2.5.8.jar;C:\Users\liuya\.m2\repository\ch\qos\logback\logback-classic\1.2.9\logback-classic-1.2.9.jar;C:\Users\liuya\.m2\repository\ch\qos\logback\logback-core\1.2.9\logback-core-1.2.9.jar;C:\Users\liuya\.m2\repository\org\apache\logging\log4j\log4j-to-slf4j\2.17.0\log4j-to-slf4j-2.17.0.jar;C:\Users\liuya\.m2\repository\org\apache\logging\log4j\log4j-api\2.17.0\log4j-api-
1 ----- start init,need 2 seconds
3 ----- 已经有线程在进行初始化工作。。。。。
2 ----- 已经有线程在进行初始化工作。。。。。
4 ----- 已经有线程在进行初始化工作。。。。。
5 ----- 已经有线程在进行初始化工作。。。。。
1 ----- over init
Process finished with exit code 0

原子操作增强原子类Java8

  • LongAccumulator:Java 8 中的一个原子累加器类,用于对 long 类型的数值进行原子操作。它是 java.util.concurrent.atomic 包中的一员,是 LongAdder 的增强版。
  • LongAdder: jdk8推荐使用LongAdder,在高并发环境下比LongAtomic性能更好(减少乐观锁的重试次数)。

在并发量比较低的情况下,线程冲突的概率比较小,自旋的次数不会很多。

但是,高并发情况下,N个线程同时进行自旋操作,N-1个线程失败,导致CPU打满场景,此时AtomicLong的自旋会成为瓶颈。这就是LongAdder引入的初衷------解决高并发环境下AtomictLong的自旋瓶颈问题。

问题引出:i++的多线程不安全情景演进

LongAdder

面我们针对LongAdder的常用API进行剖析:但LongAdder 只能做累加且只能从0开始累加

 LongAdder longAdder = new LongAdder();
longAdder.increment();
longAdder.increment();
longAdder.increment();
System.out.println(longAdder.sum());

LongAccumulator

public class LongAccumulatorAPIDemo
{
    public static void main(String[] args)
    {
        LongAccumulator longAccumulator = new LongAccumulator(new LongBinaryOperator()
        {
            @Override
            public long applyAsLong(long left, long right)
            {
                return left + right;
            }
        },0);
        longAccumulator.accumulate(1);//1
        longAccumulator.accumulate(3);//4
        System.out.println(longAccumulator.get());
    }
}

高并发点赞案例性能对比

import javax.lang.model.element.VariableElement;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
class ClickNumber //资源类
{
    int number = 0;
    public synchronized void clickBySynchronized()
    {
        number++;
    }
    AtomicLong atomicLong = new AtomicLong(0);
    public void clickByAtomicLong()
    {
        atomicLong.getAndIncrement();
    }
    LongAdder longAdder = new LongAdder();
    public void clickByLongAdder()
    {
        longAdder.increment();
    }
    LongAccumulator longAccumulator = new LongAccumulator((x,y) -> x + y,0);
    public void clickByLongAccumulator()
    {
        longAccumulator.accumulate(1);
    }
}
/**
 * @auther zzyy
 * 需求: 50个线程,每个线程100W次,总点赞数出来
 */
public class AccumulatorCompareDemo
{
    public static final int _1W = 100000;
    public static final int threadNumber = 50;
    public static void main(String[] args) throws InterruptedException
    {
        ClickNumber clickNumber = new ClickNumber();
        long startTime;
        long endTime;
        CountDownLatch countDownLatch1 = new CountDownLatch(threadNumber);
        CountDownLatch countDownLatch2 = new CountDownLatch(threadNumber);
        CountDownLatch countDownLatch3 = new CountDownLatch(threadNumber);
        CountDownLatch countDownLatch4 = new CountDownLatch(threadNumber);
        startTime = System.currentTimeMillis();
        for (int i = 1; i <=threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <=100 * _1W; j++) {
                        clickNumber.clickBySynchronized();
                    }
                } finally {
                    countDownLatch1.countDown();
                }
            },String.valueOf(i)).start();
        }
        countDownLatch1.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t clickBySynchronized: "+clickNumber.number);
        startTime = System.currentTimeMillis();
        for (int i = 1; i <=threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <=100 * _1W; j++) {
                        clickNumber.clickByAtomicLong();
                    }
                } finally {
                    countDownLatch2.countDown();
                }
            },String.valueOf(i)).start();
        }
        countDownLatch2.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t clickByAtomicLong: "+clickNumber.atomicLong.get());
        startTime = System.currentTimeMillis();
        for (int i = 1; i <=threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <=100 * _1W; j++) {
                        clickNumber.clickByLongAdder();
                    }
                } finally {
                    countDownLatch3.countDown();
                }
            },String.valueOf(i)).start();
        }
        countDownLatch3.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t clickByLongAdder: "+clickNumber.longAdder.sum());
        startTime = System.currentTimeMillis();
        for (int i = 1; i <=threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <=100 * _1W; j++) {
                        clickNumber.clickByLongAccumulator();
                    }
                } finally {
                    countDownLatch4.countDown();
                }
            },String.valueOf(i)).start();
        }
        countDownLatch4.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t clickByLongAccumulator: "+clickNumber.longAccumulator.get());
    }
}

控制台打印

----costTime: 45106 毫秒   clickBySynchronized: 500000000
----costTime: 9630 毫秒  clickByAtomicLong: 500000000
----costTime: 598 毫秒   clickByLongAdder: 500000000
----costTime: 422 毫秒   clickByLongAccumulator: 500000000

LongAdder原理解析

在并发量比较低的情况下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发情况下,N个线程同时进行自旋操作,N-1个线程失败,导致CPU打满场景,此时AtomicLong的自旋会成为瓶颈。这就是LongAdder引入的初衷------解决高并发环境下AtomictLong的自旋瓶颈问题。

bf6b024850454ff49691d33eaeeac9ce.png

Striped64类的结构分析

Striped64的全局变量

    /** Number of CPUS, to place bound on table size 
    当前计算机CPU数量,Cell数组扩容时会使用到
    */
    static final int NCPU = Runtime.getRuntime().availableProcessors();
    /**
     * Table of cells. When non-null, size is a power of 2.
     */
    transient volatile Cell[] cells;
    /**
     * Base value, used mainly when there is no contention, but also as
     * a fallback during table initialization races. Updated via CAS.
     类似于AtomicLong中全局的value值。再没有竞争情况下数据直接累加到base上,或者cells扩容时,也需要将数据写入到base上
     */
    transient volatile long base;
    /**
     * Spinlock (locked via CAS) used when resizing and/or creating Cells.
     初始化cells或者扩容cells需要获取锁,0表示无锁状态,1表示其他线程已经持有了锁
     */
    transient volatile int cellsBusy;

Striped64分散热点技术剖析

5f4486c5b5af4eab88ff6031d18f9263.png

LongAdder#add

1、最初无竞争时,直接通过casBase进行更新base的处理,跳过if,当casBase比较激烈,则进入if判断

2、调用longAccumulate:如果更新base失败后,首次新建一个Cell[ ]数组(默认长度是2)

3、调用longAccumulate:如果Cell数组当中的某一个槽位为空

4、调用longAccumulate:当多个线程竞争同一个Cell比较激烈时,可能就要对Cell[ ]扩容

5661dad1ecbb4adc9f125b8ae3ce6ff3.png

 LongAdder.java
  public void add(long x) {
    //as是striped64中的cells数组
    //b是striped64中的base
    //v是当前线程hash到的cell中存储的值
    //m是cells的长度减1,hash时作为掩码使用
    //a时当前线程hash到的cell
        Cell[] as; long b, v; int m; Cell a;
    /**
    首次首线程(as = cells) != null)一定是false,此时走casBase方法,以CAS的方式更新base值,
    且只有当cas失败时,才会走到if中
    条件1:cells不为空,说明出现过竞争,cell[]已创建
    条件2:cas操作base失败,说明其他线程先一步修改了base正在出现竞争
    */
        if ((as = cells) != null || !casBase(b = base, b + x)) {
      //true无竞争 fasle表示竞争激烈,多个线程hash到同一个cell,可能要扩容
            boolean uncontended = true;
      /*
      条件1:cells为空,说明正在出现竞争,上面是从条件2过来的,说明!casBase(b = base, b + x))=true
          会通过调用longAccumulate(x, null, uncontended)新建一个数组,默认长度是2
      条件2:默认会新建一个数组长度为2的数组,m = as.length - 1) < 0 应该不会出现,
      条件3:当前线程所在的cell为空,说明当前线程还没有更新过cell,应初始化一个cell。
          a = as[getProbe() & m]) == null,如果cell为空,进行一个初始化的处理
      条件4:更新当前线程所在的cell失败,说明现在竞争很激烈,多个线程hash到同一个Cell,应扩容
          (如果是cell中有一个线程操作,这个时候,通过a.cas(v = a.value, v + x)可以进行处理,返回的结果是true)
      **/
            if (as == null || (m = as.length - 1) < 0 ||
          //getProbe( )方法返回的时线程中的threadLocalRandomProbe字段
        //它是通过随机数生成的一个值,对于一个确定的线程这个值是固定的(除非刻意修改它)
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
        //调用Striped64中的方法处理
                longAccumulate(x, null, uncontended);
        }

556b12b629534a83a60e41c422f830a0.png

CASE2:刚刚初始化Cell[ ]数组(首次新建)

//CASE2:cells没有加锁且没有初始化,则尝试对它进行加锁,并初始化cells数组
  /*
  cellsBusy:初始化cells或者扩容cells需要获取锁,0表示无锁状态,1表示其他线程已经持有了锁
  cells == as == null  是成立的
  casCellsBusy:通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,
  返回true,第一次进来没人抢占cell单元格,肯定返回true
  **/
  else if (cellsBusy == 0 && cells == as && casCellsBusy()) { 
      //是否初始化的标记
    boolean init = false;
    try {                           // Initialize table(新建cells)
      // 前面else if中进行了判断,这里再次判断,采用双端检索的机制
      if (cells == as) {
        //如果上面条件都执行成功就会执行数组的初始化及赋值操作,Cell[] rs = new Cell[2]标识数组的长度为2
        Cell[] rs = new Cell[2];
        //rs[h & 1] = new Cell(x)表示创建一个新的cell元素,value是x值,默认为1
        //h & 1 类似于我们之前hashmap常用到的计算散列桶index的算法,
        //通常都是hash&(table.len-1),同hashmap一个意思
        //看这次的value是落在0还是1
        rs[h & 1] = new Cell(x);
        cells = rs;
        init = true;
      }
    } finally {
      cellsBusy = 0;
    }
    if (init)
      break;
  }

CASE3:兜底(多个线程尝试CAS修改失败的线程会走这个分支)

//CASE3:cells正在进行初始化,则尝试直接在基数base上进行累加操作
  //这种情况是cell中都CAS失败了,有一个兜底的方法
  //该分支实现直接操作base基数,将值累加到base上,
  //也即其他线程正在初始化,多个线程正在更新base的值
  else if (casBase(v = base, ((fn == null) ? v + x :
                fn.applyAsLong(v, x))))
    break;     

CASE1:Cell数组不再为空且可能存在Cell数组扩容

for (;;) {
  Cell[] as; Cell a; int n; long v;
  if ((as = cells) != null && (n = as.length) > 0) { // CASE1:cells已经初始化了
      // 当前线程的hash值运算后映射得到的Cell单元为null,说明该Cell没有被使用
    if ((a = as[(n - 1) & h]) == null) {
      //Cell[]数组没有正在扩容
      if (cellsBusy == 0) {       // Try to attach new Cell
        //先创建一个Cell
        Cell r = new Cell(x);   // Optimistically create
        //尝试加锁,加锁后cellsBusy=1
        if (cellsBusy == 0 && casCellsBusy()) { 
          boolean created = false;
          try {               // Recheck under lock
            Cell[] rs; int m, j; //将cell单元赋值到Cell[]数组上
            //在有锁的情况下再检测一遍之前的判断 
            if ((rs = cells) != null &&
              (m = rs.length) > 0 &&
              rs[j = (m - 1) & h] == null) {
              rs[j] = r;
              created = true;
            }
          } finally {
            cellsBusy = 0;//释放锁
          }
          if (created)
            break;
          continue;           // Slot is now non-empty
        }
      }
      collide = false;
    }
    /**
    wasUncontended表示cells初始化后,当前线程竞争修改失败
    wasUncontended=false,表示竞争激烈,需要扩容,这里只是重新设置了这个值为true,
    紧接着执行advanceProbe(h)重置当前线程的hash,重新循环
    */
    else if (!wasUncontended)       // CAS already known to fail
      wasUncontended = true;      // Continue after rehash
    //说明当前线程对应的数组中有了数据,也重置过hash值
    //这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环
    else if (a.cas(v = a.value, ((fn == null) ? v + x :
                   fn.applyAsLong(v, x))))
      break;
    //如果n大于CPU最大数量,不可扩容,并通过下面的h=advanceProbe(h)方法修改线程的probe再重新尝试
    else if (n >= NCPU || cells != as)
      collide = false;    //扩容标识设置为false,标识永远不会再扩容
    //如果扩容意向collide是false则修改它为true,然后重新计算当前线程的hash值继续循环
    else if (!collide) 
      collide = true;
    //锁状态为0并且将锁状态修改为1(持有锁) 
    else if (cellsBusy == 0 && casCellsBusy()) { 
      try {
        if (cells == as) {      // Expand table unless stale
          //按位左移1位来操作,扩容大小为之前容量的两倍
          Cell[] rs = new Cell[n << 1];
          for (int i = 0; i < n; ++i)
            //扩容后将之前数组的元素拷贝到新数组中
            rs[i] = as[i];
          cells = rs; 
        }
      } finally {
        //释放锁设置cellsBusy=0,设置扩容状态,然后进行循环执行
        cellsBusy = 0;
      }
      collide = false;
      continue;                   // Retry with expanded table
    }
    h = advanceProbe(h);
  }

LongAdder#sum

LongAdder#sum( )会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点

  public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

为啥高并发下sum的值不精确?sum执行时,并没有限制对base和cells的更新(一句要命的话)。所以LongAdder不是强一致性,它是最终一致性的

●首先,最终返回的sum局部变量,初始被赋值为base,而最终返回时,很可能base已经被更新了,而此时局部变量sum不会更新,造成不一致

●其次,这里对cell的读取也无法保证是最后一次写入的值。所以,sum方法只是在没有并发的情况下,可以获得正确的结果


相关文章
|
2月前
|
监控 Java 调度
【Java学习】多线程&JUC万字超详解
本文详细介绍了多线程的概念和三种实现方式,还有一些常见的成员方法,CPU的调动方式,多线程的生命周期,还有线程安全问题,锁和死锁的概念,以及等待唤醒机制,阻塞队列,多线程的六种状态,线程池等
134 6
【Java学习】多线程&JUC万字超详解
|
6月前
|
算法 安全 Java
Java多线程基础-12:详解CAS算法
CAS(Compare and Swap)算法是一种无锁同步原语,用于在多线程环境中更新内存位置的值。
71 0
|
1月前
|
安全
【多线程】CAS、ABA问题详解
【多线程】CAS、ABA问题详解
21 0
|
5月前
|
NoSQL Redis
Redis系列学习文章分享---第五篇(Redis实战篇--优惠券秒杀,全局唯一id 添加优惠券 实现秒杀下单 库存超卖问题分析 乐观锁解决超卖 实现一人一单功能 集群下的线程并发安全问题)
Redis系列学习文章分享---第五篇(Redis实战篇--优惠券秒杀,全局唯一id 添加优惠券 实现秒杀下单 库存超卖问题分析 乐观锁解决超卖 实现一人一单功能 集群下的线程并发安全问题)
127 0
|
6月前
|
安全 Java 编译器
Java 多线程系列Ⅴ(常见锁策略+CAS+synchronized原理)
Java 多线程系列Ⅴ(常见锁策略+CAS+synchronized原理)
|
6月前
|
安全 Java
多线程(CAS, ABA问题, Runnable & Callable & 僵尸线程 & 孤儿进程)
多线程(CAS, ABA问题, Runnable & Callable & 僵尸线程 & 孤儿进程)
60 1
|
5月前
|
调度 Python
Python多线程学习优质方法分享
Python多线程学习优质方法分享
26 0
|
5月前
|
安全 API C++
逆向学习Windows篇:C++中多线程的使用和回调函数的实现
逆向学习Windows篇:C++中多线程的使用和回调函数的实现
187 0
|
5月前
|
Java
Java线程学习经典例子-读写者演示
Java线程学习经典例子-读写者演示
23 0
|
6月前
|
Java 调度
【JAVA学习之路 | 提高篇】线程的通信
【JAVA学习之路 | 提高篇】线程的通信