原子操作类概述
Java中提供了一些原子操作类,用于实现多线程环境下的数据同步问题。其中最常见的有以下几种:
1. AtomicBoolean:原子操作布尔型变量。
2. AtomicInteger:原子操作整型变量。
3. AtomicLong:原子操作长整型变量。
4. AtomicReference:原子操作引用类型变量。
5. AtomicStampedReference:原子操作带有版本号的引用类型变量。
这些原子操作类都提供了一些线程安全的方法,比如get、set、compareAndSet等,可以保证数据的原子性操作,避免了多线程环境下的数据竞争问题。
多线程环境不使用原子类保证线程安全i(基本数据类型)
1. public class AtomicIntegerTest { 2. 3. volatile int num=0; 4. 5. public int getNumber(){ 6. return num; 7. } 8. 9. public synchronized void addNumber(){ 10. num++; 11. } 12. }
多线程环境使用原子类保证线程安全i++(基本数据类型)
1. public class AtomicIntegerTest { 2. 3. AtomicInteger atomicInteger = new AtomicInteger(0); 4. 5. public int getNum(){ 6. return atomicInteger.get(); 7. } 8. 9. public void addNum(){ 10. atomicInteger.getAndIncrement(); 11. } 12. }
volatile解决多线程内存不可见问题对于一写多读,是可以解决变量同步问题,但是如果多写,同样无法解决线程安全问题。也就是无法保证原子性。
如果是count++操作,使用如下类实现:
AtomicInteger count = new AtomicInteger();
count.addAndGet(1);
如果是JDK8,推荐使用LongAdder对象,比AtomicLong性能更好(减少乐观锁的重试次数)。
代码实战 (AtomicInteger)
AtomicInteger 常用API操作
1. public class AtomicIntegerTest { 2. public static void main(String[] args) throws InterruptedException { 3. AtomicInteger atomicInteger = new AtomicInteger(0); 4. CountDownLatch latch = new CountDownLatch(50); 5. for(int i=0;i<50;i++){ 6. new Thread(()->{ 7. try { 8. for(int j=0;j<10000;j++){ 9. atomicInteger.getAndIncrement(); 10. } 11. }finally { 12. latch.countDown(); 13. } 14. },""+i).start(); 15. } 16. latch.await(); 17. System.out.println("最终结果"+atomicInteger.get()); 18. } 19. }
代码实战(数组类型原子类 )
AtomicIntegerArray是一个原子性的数组,它提供了一系列线程安全的方法。
1. AtomicIntegerArray(int length):创建一个给定长度的AtomicIntegerArray。
2. AtomicIntegerArray(int[] array):用一个给定的数组创建一个AtomicIntegerArray。
3. int length():返回AtomicIntegerArray的长度。
4. int get(int index):获取给定索引处的元素的值。
5. void set(int index, int newValue):设置给定索引处的元素为指定值。
6. void lazySet(int index, int newValue):设置给定索引处的元素为指定值,但不保证立即生效。
7. int getAndSet(int index, int newValue):设置给定索引处的元素为指定值,并返回先前的值。
8. boolean compareAndSet(int index, int expect, int update):如果当前值等于期望值,则将其设置为新值,并返回true。
9. boolean weakCompareAndSet(int index, int expect, int update):如果当前值等于期望值,则将其设置为新值,并返回true。在竞争高的情况下,此方法比compareAndSet更有效。
10. int getAndIncrement(int index):返回原始值并将给定索引处的元素增加1。
11. int getAndDecrement(int index):返回原始值并将给定索引处的元素减少1。
12. int getAndAdd(int index, int delta):返回原始值,并将给定索引处的元素增加给定的值。
13. void addAndGet(int index, int delta):将给定的值添加到给定索引处的元素,并返回新值。
14. boolean weakCompareAndSetAcquire(int index, int expect, int update):这是一个JDK13及以上版本的API,在Java11和12中不存在。这个方法和weakCompareAndSet方法一样,增加了一个内存屏障,即强制对该操作执行ACQUIRE半屏障,可用于加锁或者线程同步。
15. boolean weakCompareAndSetRelease(int index, int expect, int update):这是一个JDK13及以上版本的API,在Java11和12中不存在。这个方法和weakCompareAndSet方法一样,增加了一个内存屏障,即强制对该操作执行RELEASE半屏障,可用于解锁或者线程同步。
1. public class AtomicIntegerArrayTest { 2. public static void main(String[] args) { 3. AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]); 4. for (int i=0;i<atomicIntegerArray.length();i++){ 5. System.out.println(atomicIntegerArray.get(i)); 6. } 7. AtomicIntegerArray array = new AtomicIntegerArray(new int[]{1,2,3,4,5}); 8. array.getAndAdd(0, 999); 9. System.out.println(array.get(0)); 10. array.getAndIncrement(4); 11. System.out.println(array.get(4)); 12. } 13. }
代码实战(引用类型原子类)
AtomicStampedReference携带版本号的引用类型原子类,可以解决ABA问题。解决修改过几次。状态戳原子引用AtomicMarkableReference原子更新带有标记位的引用类型对象。它的定义就是将状态戳简化为true/false。解决是否修改过,类似一次性筷子。状态戳(true/false)原子引用
1. public class AtomicMarkableReferenceTest { 2. public static void main(String[] args) { 3. AtomicMarkableReference<Integer> reference = new AtomicMarkableReference<>(1,false); 4. new Thread(()->{ 5. boolean marked = reference.isMarked(); 6. Integer integer = reference.getReference(); 7. try { 8. TimeUnit.SECONDS.sleep(1); 9. } catch (InterruptedException e) { 10. e.printStackTrace(); 11. } 12. reference.compareAndSet(integer,99,marked,!marked); 13. },"A").start(); 14. new Thread(()->{ 15. boolean marked = reference.isMarked(); 16. Integer integer = reference.getReference(); 17. try { 18. TimeUnit.SECONDS.sleep(2); 19. } catch (InterruptedException e) { 20. e.printStackTrace(); 21. } 22. System.out.println("B线程修改结果:"+reference.compareAndSet(integer, 66, marked, !marked)); 23. System.out.println("最终标志位:"+reference.isMarked()+",最终结果:"+reference.getReference()); 24. },"B").start(); 25. } 26. }
代码实战(对象的属性修改原子类)
以一种线程安全的方式操作非线程安全对象内的某些字段
AtomiclntegerFieldUpdater:原子更新对象中int类型字段的值,基于反射的实用程序,可对指定类的指定volatile int字段进行原子更新。
AtomicLongFieldUpdater:原子更新对象中Long类型字段的值,基于反射的实用程序,可以对指定类的指定volatile long字段进行原子更新。
AtomicReferenceFieldUpdater:原子更新引用类型字段的值,基于反射的实用程序,可以对指定类的指定volatile引用字段进行原子更新。
使用要求:
更新的对象属性必须使用volatile修饰符。
因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
AtomiclntegerFieldUpdater代码
1. public class AtomicIntegerFieldUpdaterTest { 2. 3. public static void main(String[] args) { 4. int size = 50; 5. CountDownLatch latch = new CountDownLatch(size); 6. Dog dog = new Dog(); 7. for (int i = 0; i < 50; i++) { 8. new Thread(() -> { 9. try { 10. for (int j = 0; j < 10000; j++) { 11. dog.add(); 12. } 13. } finally { 14. latch.countDown(); 15. } 16. }, String.valueOf(i)).start(); 17. } 18. try { 19. latch.await(1, TimeUnit.SECONDS); 20. } catch (InterruptedException e) { 21. e.printStackTrace(); 22. } 23. if (latch.getCount() > 0) { 24. System.out.println("============执行超时============"); 25. return; 26. } 27. System.out.println(dog.age); 28. } 29. 30. static class Dog { 31. private volatile int age; 32. private String name; 33. 34. AtomicIntegerFieldUpdater<Dog> updater = AtomicIntegerFieldUpdater.newUpdater(Dog.class, "age"); 35. 36. public void add() { 37. updater.getAndIncrement(this); 38. } 39. } 40. }
AtomicReferenceFieldUpdater代码
1. public class AtomicReferenceFieldUpdaterTest { 2. public static void main(String[] args) { 3. Fish fish = new Fish(); 4. for (int i = 0; i < 5; i++) { 5. new Thread(() -> { 6. fish.init(); 7. }).start(); 8. } 9. } 10. 11. static class Fish { 12. volatile Boolean isLive = Boolean.FALSE; 13. AtomicReferenceFieldUpdater<Fish, Boolean> updater = AtomicReferenceFieldUpdater.newUpdater(Fish.class, Boolean.class, "isLive"); 14. 15. public void init() { 16. if (updater.compareAndSet(this, Boolean.FALSE, Boolean.TRUE)) { 17. System.out.println("初始化的线程:" + Thread.currentThread().getName()); 18. } else { 19. System.out.println("==========已被其他线程初始化============="); 20. } 21. } 22. } 23. }
原子操作增强类(LongAdder LongAccumulator)
LongAdder只能用来计算加减法,且从零开始计算
LongAccumulator提供了自定义的函数操作
1. public class LongAdderTest { 2. public static void main(String[] args) { 3. LongAdder longAdder = new LongAdder(); 4. longAdder.add(3L); 5. longAdder.add(5L); 6. longAdder.increment(); 7. System.out.println(longAdder.sum()); 8. longAdder.decrement(); 9. System.out.println(longAdder.sum()); 10. 11. LongAccumulator longAccumulator = new LongAccumulator((x,y)->x+y,0); 12. longAccumulator.accumulate(6); 13. longAccumulator.accumulate(2); 14. System.out.println(longAccumulator.get()); 15. } 16. }
性能比较代码
1. public class LogKpiCountTest { 2. public static void main(String[] args) { 3. LogKpiCount count = new LogKpiCount(); 4. int size = 100; 5. CountDownLatch latch1 = new CountDownLatch(size); 6. CountDownLatch latch2 = new CountDownLatch(size); 7. CountDownLatch latch3 = new CountDownLatch(size); 8. CountDownLatch latch4 = new CountDownLatch(size); 9. Long startTime=null; 10. startTime=System.currentTimeMillis(); 11. for (int i=0;i<size;i++){ 12. new Thread(()->{ 13. try { 14. for (int j = 0; j < 1000000; j++) { 15. count.synchronizedAdd(); 16. } 17. }finally { 18. latch1.countDown(); 19. } 20. 21. }).start(); 22. } 23. try { 24. latch1.await(); 25. } catch (InterruptedException e) { 26. e.printStackTrace(); 27. } 28. System.out.println("synchronizedAdd执行耗时:"+(System.currentTimeMillis()-startTime)+",执行结果:"+count.num); 29. 30. startTime=System.currentTimeMillis(); 31. for (int i=0;i<size;i++){ 32. new Thread(()->{ 33. try { 34. for (int j = 0; j < 1000000; j++) { 35. count.atomicIntegerAdd(); 36. } 37. }finally { 38. latch2.countDown(); 39. } 40. 41. }).start(); 42. } 43. try { 44. latch2.await(); 45. } catch (InterruptedException e) { 46. e.printStackTrace(); 47. } 48. System.out.println("atomicIntegerAdd执行耗时:"+(System.currentTimeMillis()-startTime)+",执行结果:"+count.atomicInteger.get()); 49. 50. startTime=System.currentTimeMillis(); 51. for (int i=0;i<size;i++){ 52. new Thread(()->{ 53. try { 54. for (int j = 0; j < 1000000; j++) { 55. count.longAdderAdd(); 56. } 57. }finally { 58. latch3.countDown(); 59. } 60. 61. }).start(); 62. } 63. try { 64. latch3.await(); 65. } catch (InterruptedException e) { 66. e.printStackTrace(); 67. } 68. System.out.println("longAdderAdd执行耗时:"+(System.currentTimeMillis()-startTime)+",执行结果:"+count.longAdder.sum()); 69. 70. startTime=System.currentTimeMillis(); 71. for (int i=0;i<size;i++){ 72. new Thread(()->{ 73. try { 74. for (int j = 0; j < 1000000; j++) { 75. count.longAccumulatorAdd(); 76. } 77. }finally { 78. latch4.countDown(); 79. } 80. 81. }).start(); 82. } 83. try { 84. latch4.await(); 85. } catch (InterruptedException e) { 86. e.printStackTrace(); 87. } 88. System.out.println("longAccumulatorAdd执行耗时:"+(System.currentTimeMillis()-startTime)+",执行结果:"+count.accumulator.get()); 89. } 90. 91. static class LogKpiCount{ 92. int num; 93. AtomicInteger atomicInteger = new AtomicInteger(); 94. LongAdder longAdder = new LongAdder(); 95. LongAccumulator accumulator = new LongAccumulator((x,y)->x+y,0); 96. public synchronized void synchronizedAdd(){ 97. num++; 98. } 99. 100. public void atomicIntegerAdd(){ 101. atomicInteger.getAndIncrement(); 102. } 103. 104. public void longAdderAdd(){ 105. longAdder.increment(); 106. } 107. 108. public void longAccumulatorAdd(){ 109. accumulator.accumulate(1); 110. } 111. } 112. }
synchronizedAdd执行耗时:3585,执行结果:100000000
atomicIntegerAdd执行耗时:1567,执行结果:100000000
longAdderAdd执行耗时:629,执行结果:100000000
longAccumulatorAdd执行耗时:132,执行结果:100000000
执行结果如下,发现synchronized与其他三个存在数量级上的差异,AtomicInteger与其他两个存在数量级上的差异。
LongAdder为什么这么快
LongAdder的基本思路就是分散热点,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点。