JUC 包中的原子类分5 大类
- AtomicInteger:整形原子类
- AtomicLong:长整型原子类
- AtomicBoolean:布尔型原子类
import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; class MyNumber { AtomicInteger atomicInteger = new AtomicInteger(); public void addPlusPlus() { atomicInteger.getAndIncrement(); } } /** * @auther zzyy * @create 2022-02-25 21:59 */ public class AtomicIntegerDemo { public static final int SIZE = 50; public static void main(String[] args) throws InterruptedException { MyNumber myNumber = new MyNumber(); CountDownLatch countDownLatch = new CountDownLatch(SIZE); for (int i = 1; i <= SIZE; i++) { new Thread(() -> { try { for (int j = 1; j <= 1000; j++) { myNumber.addPlusPlus(); } } finally { countDownLatch.countDown(); } }, String.valueOf(i)).start(); } //等待上面50个线程全部计算完成后,再去获得最终值 //暂停几秒钟线程 //try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName() + "\t" + "result: " + myNumber.atomicInteger.get()); } }
- AtomicReference:引用类型原子类
- AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
- AtomicMarkableReference :原子更新带有标记位的引用类型
1、使用JDK 1.5开始就提供的AtomicReference类保证对象之间的原子性,把多个变量放到一个对象里面进行CAS操作;
import lombok.AllArgsConstructor; import lombok.Getter; import lombok.ToString; import java.util.concurrent.atomic.AtomicReference; @Getter @ToString @AllArgsConstructor class User { String userName; int age; } /** * @auther zzyy * @create 2022-02-24 14:50 */ public class AtomicReferenceDemo { public static void main(String[] args) { AtomicReference<User> atomicReference = new AtomicReference<>(); User z3 = new User("z3", 22); User li4 = new User("li4", 28); atomicReference.set(z3); System.out.println(atomicReference.compareAndSet(z3, li4) + "\t" + atomicReference.get().toString()); System.out.println(atomicReference.compareAndSet(z3, li4) + "\t" + atomicReference.get().toString()); } }
import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** * 题目:实现一个自旋锁,复习CAS思想 * 自旋锁好处:循环比较获取没有类似wait的阻塞。 * * 通过CAS操作完成自旋锁,A线程先进来调用myLock方法自己持有锁5秒钟,B随后进来后发现 * 当前有线程持有锁,所以只能通过自旋等待,直到A释放锁后B随后抢到。 */ public class SpinLockDemo { AtomicReference<Thread> atomicReference = new AtomicReference<>(); public void lock() { Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName()+"\t"+"----come in"); while (!atomicReference.compareAndSet(null, thread)) { } } public void unLock() { Thread thread = Thread.currentThread(); atomicReference.compareAndSet(thread,null); System.out.println(Thread.currentThread().getName()+"\t"+"----task over,unLock..."); } public static void main(String[] args) { SpinLockDemo spinLockDemo = new SpinLockDemo(); new Thread(() -> { spinLockDemo.lock(); //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } spinLockDemo.unLock(); },"A").start(); //暂停500毫秒,线程A先于B启动 try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { spinLockDemo.lock(); spinLockDemo.unLock(); },"B").start(); } }
- AtomicIntegerArray:整形数组原子类
- AtomicLongArray:长整形数组原子类
- AtomicReferenceArray:引用类型数组原子类
import java.util.concurrent.atomic.AtomicIntegerArray; public class AtomicIntegerArrayDemo { public static void main(String[] args) { AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]); //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5); //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[]{1,2,3,4,5}); for (int i = 0; i <atomicIntegerArray.length(); i++) { System.out.println(atomicIntegerArray.get(i)); } System.out.println(); int tmpInt = 0; tmpInt = atomicIntegerArray.getAndSet(0,1122); System.out.println(tmpInt+"\t"+atomicIntegerArray.get(0)); tmpInt = atomicIntegerArray.getAndIncrement(0); System.out.println(tmpInt+"\t"+atomicIntegerArray.get(0)); } }
- AtomicIntegerFieldUpdater:原子更新整形字段的更新器
- AtomicLongFieldUpdater:原子更新长整形字段的更新器
- AtomicReferenceFieldUpdater:原子更新引用类型字段的更新器
①. 使用目的:
②. 使用要求
更新的对象属性必须使用public volatile修饰符
这种原子类型,是抽象类,所以每次使用都必须使用静态方法newUpdater( )创建一个更新器,并且需要设置想要更新的类和属性
import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; class BankAccount//资源类 { String bankName = "CCB"; //更新的对象属性必须使用 public volatile 修饰符。 public volatile int money = 0;//钱数 public void add() { money++; } //因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须 // 使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。 AtomicIntegerFieldUpdater<BankAccount> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(BankAccount.class,"money"); //不加synchronized,保证高性能原子性,局部微创小手术 public void transMoney(BankAccount bankAccount) { fieldUpdater.getAndIncrement(bankAccount); } } /** * @auther zzyy * 以一种线程安全的方式操作非线程安全对象的某些字段。 * * 需求: * 10个线程, * 每个线程转账1000, * 不使用synchronized,尝试使用AtomicIntegerFieldUpdater来实现。 */ public class AtomicIntegerFieldUpdaterDemo { public static void main(String[] args) throws InterruptedException { BankAccount bankAccount = new BankAccount(); CountDownLatch countDownLatch = new CountDownLatch(10); for (int i = 1; i <=10; i++) { new Thread(() -> { try { for (int j = 1; j <=1000; j++) { //bankAccount.add(); bankAccount.transMoney(bankAccount); } } finally { countDownLatch.countDown(); } },String.valueOf(i)).start(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName()+"\t"+"result: "+bankAccount.money); } }
import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; class MyVar //资源类 { public volatile Boolean isInit = Boolean.FALSE; AtomicReferenceFieldUpdater<MyVar,Boolean> referenceFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(MyVar.class,Boolean.class,"isInit"); public void init(MyVar myVar) { if (referenceFieldUpdater.compareAndSet(myVar,Boolean.FALSE,Boolean.TRUE)) { System.out.println(Thread.currentThread().getName()+"\t"+"----- start init,need 2 seconds"); //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"\t"+"----- over init"); }else{ System.out.println(Thread.currentThread().getName()+"\t"+"----- 已经有线程在进行初始化工作。。。。。"); } } } /** * @auther zzyy * 需求: * 多线程并发调用一个类的初始化方法,如果未被初始化过,将执行初始化工作, * 要求只能被初始化一次,只有一个线程操作成功 */ public class AtomicReferenceFieldUpdaterDemo { public static void main(String[] args) { MyVar myVar = new MyVar(); for (int i = 1; i <=5; i++) { new Thread(() -> { myVar.init(myVar); },String.valueOf(i)).start(); } } } //打印: D:\App\java1.8\jdk\bin\java.exe "-javaagent:D:\App\IntelliJ IDEA 2020.3.1\lib\idea_rt.jar=57957:D:\App\IntelliJ IDEA 2020.3.1\bin" -Dfile.encoding=GBK -classpath D:\App\java1.8\jdk\jre\lib\charsets.jar;D:\App\java1.8\jdk\jre\lib\deploy.jar;D:\App\java1.8\jdk\jre\lib\ext\access-bridge-64.jar;D:\App\java1.8\jdk\jre\lib\ext\cldrdata.jar;D:\App\java1.8\jdk\jre\lib\ext\dnsns.jar;D:\App\java1.8\jdk\jre\lib\ext\jaccess.jar;D:\App\java1.8\jdk\jre\lib\ext\jfxrt.jar;D:\App\java1.8\jdk\jre\lib\ext\localedata.jar;D:\App\java1.8\jdk\jre\lib\ext\nashorn.jar;D:\App\java1.8\jdk\jre\lib\ext\sunec.jar;D:\App\java1.8\jdk\jre\lib\ext\sunjce_provider.jar;D:\App\java1.8\jdk\jre\lib\ext\sunmscapi.jar;D:\App\java1.8\jdk\jre\lib\ext\sunpkcs11.jar;D:\App\java1.8\jdk\jre\lib\ext\zipfs.jar;D:\App\java1.8\jdk\jre\lib\javaws.jar;D:\App\java1.8\jdk\jre\lib\jce.jar;D:\App\java1.8\jdk\jre\lib\jfr.jar;D:\App\java1.8\jdk\jre\lib\jfxswt.jar;D:\App\java1.8\jdk\jre\lib\jsse.jar;D:\App\java1.8\jdk\jre\lib\management-agent.jar;D:\App\java1.8\jdk\jre\lib\plugin.jar;D:\App\java1.8\jdk\jre\lib\resources.jar;D:\App\java1.8\jdk\jre\lib\rt.jar;D:\devspace\source-bulldozer\JavaConcurrent\target\classes;C:\Users\liuya\.m2\repository\org\springframework\boot\spring-boot-starter-web\2.5.8\spring-boot-starter-web-2.5.8.jar;C:\Users\liuya\.m2\repository\org\springframework\boot\spring-boot-starter\2.5.8\spring-boot-starter-2.5.8.jar;C:\Users\liuya\.m2\repository\org\springframework\boot\spring-boot\2.5.8\spring-boot-2.5.8.jar;C:\Users\liuya\.m2\repository\org\springframework\boot\spring-boot-autoconfigure\2.5.8\spring-boot-autoconfigure-2.5.8.jar;C:\Users\liuya\.m2\repository\org\springframework\boot\spring-boot-starter-logging\2.5.8\spring-boot-starter-logging-2.5.8.jar;C:\Users\liuya\.m2\repository\ch\qos\logback\logback-classic\1.2.9\logback-classic-1.2.9.jar;C:\Users\liuya\.m2\repository\ch\qos\logback\logback-core\1.2.9\logback-core-1.2.9.jar;C:\Users\liuya\.m2\repository\org\apache\logging\log4j\log4j-to-slf4j\2.17.0\log4j-to-slf4j-2.17.0.jar;C:\Users\liuya\.m2\repository\org\apache\logging\log4j\log4j-api\2.17.0\log4j-api- 1 ----- start init,need 2 seconds 3 ----- 已经有线程在进行初始化工作。。。。。 2 ----- 已经有线程在进行初始化工作。。。。。 4 ----- 已经有线程在进行初始化工作。。。。。 5 ----- 已经有线程在进行初始化工作。。。。。 1 ----- over init Process finished with exit code 0
- LongAccumulator:Java 8 中的一个原子累加器类,用于对 long 类型的数值进行原子操作。它是
- LongAdder: jdk8推荐使用LongAdder,在高并发环境下比LongAtomic性能更好(减少乐观锁的重试次数)。
面我们针对LongAdder的常用API进行剖析:但LongAdder 只能做累加且只能从0开始累加
LongAdder longAdder = new LongAdder(); longAdder.increment(); longAdder.increment(); longAdder.increment(); System.out.println(longAdder.sum());
public class LongAccumulatorAPIDemo { public static void main(String[] args) { LongAccumulator longAccumulator = new LongAccumulator(new LongBinaryOperator() { @Override public long applyAsLong(long left, long right) { return left + right; } },0); longAccumulator.accumulate(1);//1 longAccumulator.accumulate(3);//4 System.out.println(longAccumulator.get()); } }
import javax.lang.model.element.VariableElement; import java.util.HashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAccumulator; import java.util.concurrent.atomic.LongAdder; class ClickNumber //资源类 { int number = 0; public synchronized void clickBySynchronized() { number++; } AtomicLong atomicLong = new AtomicLong(0); public void clickByAtomicLong() { atomicLong.getAndIncrement(); } LongAdder longAdder = new LongAdder(); public void clickByLongAdder() { longAdder.increment(); } LongAccumulator longAccumulator = new LongAccumulator((x,y) -> x + y,0); public void clickByLongAccumulator() { longAccumulator.accumulate(1); } } /** * @auther zzyy * 需求: 50个线程,每个线程100W次,总点赞数出来 */ public class AccumulatorCompareDemo { public static final int _1W = 100000; public static final int threadNumber = 50; public static void main(String[] args) throws InterruptedException { ClickNumber clickNumber = new ClickNumber(); long startTime; long endTime; CountDownLatch countDownLatch1 = new CountDownLatch(threadNumber); CountDownLatch countDownLatch2 = new CountDownLatch(threadNumber); CountDownLatch countDownLatch3 = new CountDownLatch(threadNumber); CountDownLatch countDownLatch4 = new CountDownLatch(threadNumber); startTime = System.currentTimeMillis(); for (int i = 1; i <=threadNumber; i++) { new Thread(() -> { try { for (int j = 1; j <=100 * _1W; j++) { clickNumber.clickBySynchronized(); } } finally { countDownLatch1.countDown(); } },String.valueOf(i)).start(); } countDownLatch1.await(); endTime = System.currentTimeMillis(); System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t clickBySynchronized: "+clickNumber.number); startTime = System.currentTimeMillis(); for (int i = 1; i <=threadNumber; i++) { new Thread(() -> { try { for (int j = 1; j <=100 * _1W; j++) { clickNumber.clickByAtomicLong(); } } finally { countDownLatch2.countDown(); } },String.valueOf(i)).start(); } countDownLatch2.await(); endTime = System.currentTimeMillis(); System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t clickByAtomicLong: "+clickNumber.atomicLong.get()); startTime = System.currentTimeMillis(); for (int i = 1; i <=threadNumber; i++) { new Thread(() -> { try { for (int j = 1; j <=100 * _1W; j++) { clickNumber.clickByLongAdder(); } } finally { countDownLatch3.countDown(); } },String.valueOf(i)).start(); } countDownLatch3.await(); endTime = System.currentTimeMillis(); System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t clickByLongAdder: "+clickNumber.longAdder.sum()); startTime = System.currentTimeMillis(); for (int i = 1; i <=threadNumber; i++) { new Thread(() -> { try { for (int j = 1; j <=100 * _1W; j++) { clickNumber.clickByLongAccumulator(); } } finally { countDownLatch4.countDown(); } },String.valueOf(i)).start(); } countDownLatch4.await(); endTime = System.currentTimeMillis(); System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t clickByLongAccumulator: "+clickNumber.longAccumulator.get()); } }
----costTime: 45106 毫秒 clickBySynchronized: 500000000 ----costTime: 9630 毫秒 clickByAtomicLong: 500000000 ----costTime: 598 毫秒 clickByLongAdder: 500000000 ----costTime: 422 毫秒 clickByLongAccumulator: 500000000
/** Number of CPUS, to place bound on table size 当前计算机CPU数量,Cell数组扩容时会使用到 */ static final int NCPU = Runtime.getRuntime().availableProcessors(); /** * Table of cells. When non-null, size is a power of 2. */ transient volatile Cell[] cells; /** * Base value, used mainly when there is no contention, but also as * a fallback during table initialization races. Updated via CAS. 类似于AtomicLong中全局的value值。再没有竞争情况下数据直接累加到base上,或者cells扩容时,也需要将数据写入到base上 */ transient volatile long base; /** * Spinlock (locked via CAS) used when resizing and/or creating Cells. 初始化cells或者扩容cells需要获取锁,0表示无锁状态,1表示其他线程已经持有了锁 */ transient volatile int cellsBusy;
2、调用longAccumulate:如果更新base失败后,首次新建一个Cell[ ]数组(默认长度是2)
4、调用longAccumulate:当多个线程竞争同一个Cell比较激烈时,可能就要对Cell[ ]扩容
LongAdder.java public void add(long x) { //as是striped64中的cells数组 //b是striped64中的base //v是当前线程hash到的cell中存储的值 //m是cells的长度减1,hash时作为掩码使用 //a时当前线程hash到的cell Cell[] as; long b, v; int m; Cell a; /** 首次首线程(as = cells) != null)一定是false,此时走casBase方法,以CAS的方式更新base值, 且只有当cas失败时,才会走到if中 条件1:cells不为空,说明出现过竞争,cell[]已创建 条件2:cas操作base失败,说明其他线程先一步修改了base正在出现竞争 */ if ((as = cells) != null || !casBase(b = base, b + x)) { //true无竞争 fasle表示竞争激烈,多个线程hash到同一个cell,可能要扩容 boolean uncontended = true; /* 条件1:cells为空,说明正在出现竞争,上面是从条件2过来的,说明!casBase(b = base, b + x))=true 会通过调用longAccumulate(x, null, uncontended)新建一个数组,默认长度是2 条件2:默认会新建一个数组长度为2的数组,m = as.length - 1) < 0 应该不会出现, 条件3:当前线程所在的cell为空,说明当前线程还没有更新过cell,应初始化一个cell。 a = as[getProbe() & m]) == null,如果cell为空,进行一个初始化的处理 条件4:更新当前线程所在的cell失败,说明现在竞争很激烈,多个线程hash到同一个Cell,应扩容 (如果是cell中有一个线程操作,这个时候,通过a.cas(v = a.value, v + x)可以进行处理,返回的结果是true) **/ if (as == null || (m = as.length - 1) < 0 || //getProbe( )方法返回的时线程中的threadLocalRandomProbe字段 //它是通过随机数生成的一个值,对于一个确定的线程这个值是固定的(除非刻意修改它) (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) //调用Striped64中的方法处理 longAccumulate(x, null, uncontended); }
CASE2:刚刚初始化Cell[ ]数组(首次新建)
//CASE2:cells没有加锁且没有初始化,则尝试对它进行加锁,并初始化cells数组 /* cellsBusy:初始化cells或者扩容cells需要获取锁,0表示无锁状态,1表示其他线程已经持有了锁 cells == as == null 是成立的 casCellsBusy:通过CAS操作修改cellsBusy的值,CAS成功代表获取锁, 返回true,第一次进来没人抢占cell单元格,肯定返回true **/ else if (cellsBusy == 0 && cells == as && casCellsBusy()) { //是否初始化的标记 boolean init = false; try { // Initialize table(新建cells) // 前面else if中进行了判断,这里再次判断,采用双端检索的机制 if (cells == as) { //如果上面条件都执行成功就会执行数组的初始化及赋值操作,Cell[] rs = new Cell[2]标识数组的长度为2 Cell[] rs = new Cell[2]; //rs[h & 1] = new Cell(x)表示创建一个新的cell元素,value是x值,默认为1 //h & 1 类似于我们之前hashmap常用到的计算散列桶index的算法, //通常都是hash&(table.len-1),同hashmap一个意思 //看这次的value是落在0还是1 rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; }
//CASE3:cells正在进行初始化,则尝试直接在基数base上进行累加操作 //这种情况是cell中都CAS失败了,有一个兜底的方法 //该分支实现直接操作base基数,将值累加到base上, //也即其他线程正在初始化,多个线程正在更新base的值 else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break;
for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // CASE1:cells已经初始化了 // 当前线程的hash值运算后映射得到的Cell单元为null,说明该Cell没有被使用 if ((a = as[(n - 1) & h]) == null) { //Cell[]数组没有正在扩容 if (cellsBusy == 0) { // Try to attach new Cell //先创建一个Cell Cell r = new Cell(x); // Optimistically create //尝试加锁,加锁后cellsBusy=1 if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; //将cell单元赋值到Cell[]数组上 //在有锁的情况下再检测一遍之前的判断 if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0;//释放锁 } if (created) break; continue; // Slot is now non-empty } } collide = false; } /** wasUncontended表示cells初始化后,当前线程竞争修改失败 wasUncontended=false,表示竞争激烈,需要扩容,这里只是重新设置了这个值为true, 紧接着执行advanceProbe(h)重置当前线程的hash,重新循环 */ else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash //说明当前线程对应的数组中有了数据,也重置过hash值 //这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; //如果n大于CPU最大数量,不可扩容,并通过下面的h=advanceProbe(h)方法修改线程的probe再重新尝试 else if (n >= NCPU || cells != as) collide = false; //扩容标识设置为false,标识永远不会再扩容 //如果扩容意向collide是false则修改它为true,然后重新计算当前线程的hash值继续循环 else if (!collide) collide = true; //锁状态为0并且将锁状态修改为1(持有锁) else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale //按位左移1位来操作,扩容大小为之前容量的两倍 Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) //扩容后将之前数组的元素拷贝到新数组中 rs[i] = as[i]; cells = rs; } } finally { //释放锁设置cellsBusy=0,设置扩容状态,然后进行循环执行 cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = advanceProbe(h); }
LongAdder#sum( )会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点
public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }