文章目录:
1.CAS
CAS(Compare And Swap)是由硬件实现的。CAS 可以将 read- modify - write 这类的操作转换为原子操作。
i++自增操作包括三个子操作: 从主内存读取 i 变量值;对 i 的值加 1;再把加 1 之后的值保存到主内存。
CAS原理: 在把数据更新到主内存时,再次读取主内存变量的值,如果现在变量的值与期望的值(操作起始时读取的值)一样就更新。
下面我说一下,我对上面这张图中CAS算法的理解:首先在主内存有一个所有线程都共享的count变量,它的值初始化为10。
1. 线程1从主内存中读取count变量的值到自己的工作内存中,此时count为10。(这个值我们记为:线程1期望的count值、最初读取到的count值)
2. 线程1对自己工作内存中的count的值进行自增操作,此时count为11。(这个值我们记为:线程1更新之后的count值)
3. 这个时候,当线程1还未执行到第三步更新count的值到主内存中的时候,线程2突然抢到了CPU执行权,它进来从主内存中读取count变量的值到自己的工作内存中,因为线程1还未更新,所以此时count仍为10。(这个值我们记为:线程2期望的count值、最初读取到的count值)
4. 线程2对自己工作内存中的count的值进行自增操作,此时count为11。(这个值我们记为:线程2更新之后的count值)
5. 在线程2要将自己工作内存中的count值更新到主内存之前,要执行一部重要的操作!!!就是再次读取主内存中共享变量count的值,如果此时读取到的这个值与最开始线程2期望的count值相等,那么就顺利更新count的值;如果不相等,就撤销放弃本次count++操作。
6. 那么线程2最开始期望的count值为10,而此时从主内存中读取到的count值还是10,一样,所以就更新,将自己工作内存中的count值更新到主内存中,此时count为11。
7. 那么线程2执行完了,此时线程1重新获得CPU执行权,它已经完成了count++的前两步了,就剩最后一步更新操作了,那么它此时同线程2一样,再次从主内存中读取共享变量count的值,一样就更新、不一样就撤销放弃操作。然而count的值已经被线程2更新过了,此时为11,而线程1最开始期望的count的值为10,两次读取到count的值不一样了!!!所以线程1就会自动撤销本次count++操作。
8. 最终,本次线程1、2对count变量执行++操作的结果:线程1失败、线程2成功,最后count的值只增加了1次,为11。
下面,我再来说一下什么是CAS中的ABA问题。
· CAS实现原子操作背后有一个假设:共享变量的当前值与当前线程提供的期望值相同,就认为这个变量没有被其他线程修改过。
· 实际上这种假设不一定总是成立的。假如说有共享变量 count=100。
· A线程将 count 修改为 200
· B线程将 count 修改为 300
· C线程将 count 修改为 100
· 当前线程看到的count变量的值是100,这就与最初的count变量的值是一样的,那么是否就认为count变量的值没有被其他线程更新呢?显然不是啊,它明显的被A、B两个线程更新过。
· 这就是CAS中的ABA问题,即共享变量经历了 A → B → A 的更新。
· 如果想要规避ABA问题,可以为共享变量引入一个修订好(或者叫时间戳),每次修改共享变量时,相应的修订号就会增加1,ABA问题的过程就转变为:[A,0] → [B,1] → [A,2] ,每次修改共享变量都会导致修订号的增加,通过修订号就可以准确的判断共享便是否被其他线程修改过。在原子类中,AtomicReference就会面临ABA问题的困扰,而AtomicStampedReference 可以很好的规避ABA问题。详细内容看下面的代码案例。
2.原子变量类
原子变量类基于CAS实现的,当对共享变量进行read-modify-write更新操作时,通过原子变量类可以保障操作的原子性与可见性。对变量的 read-modify-write 更新操作是指当前操作不是一个简单的赋值,而是变量的新值依赖变量的旧值,如自增操作i++。由于volatile只能保证可见性、无法保障原子性,原子变量类内部就是借助一个Volatile变量,并且保障了该变量的 read-modify-write 操作的原子性,有时把原子变量类看作增强的 volatile 变量。
原子变量类有 12 个,如:(这12个原子变量类全部位于JUC这个Java并发包的atomic子包下的)
分组 |
原子变量类 |
基础数据型 |
AtomicInteger, AtomicLong,AtomicBoolean |
数组型 |
AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray |
字段更新器 |
AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater |
引用型 |
AtomicReference,AtomicStampedReference,AtomicMarkableReference |
package com.szh.volatiletest; import java.util.concurrent.atomic.AtomicInteger; /** * 使用原子类进行自增操作 */ public class Test03 { public static void main(String[] args) throws InterruptedException { //在main线程中创建10个子线程 for (int i = 0; i < 10; i++) { new MyThread().start(); } Thread.sleep(1000); System.out.println(MyThread.count.get()); } static class MyThread extends Thread { private static AtomicInteger count=new AtomicInteger(); public static void addCount() { for (int i = 0; i < 1000; i++) { count.getAndIncrement(); } System.out.println(Thread.currentThread().getName() + " count = " + count.get()); } @Override public void run() { addCount(); } } }
首先是AtomicInteger,它其中的 getAndIncrement 方法就是 count++ 操作,get() 方法是获取最终的执行结果,可以看到10个子线程各自执行1000次 count++ 操作,最终的结果正确无误:10000。
这里我是用 AtomicLong 定义一个简单的计数器,来模拟实现原子操作。
main主线程中一共创建了1000个子线程,每个线程都看作一个请求,所以请求总数肯定是1000,然后我模拟偶数就算请求成功、奇数就算请求失败。那么这个时候,因为Java中的多线程是采用抢占式来调度的,所以这里成功和失败的比值一般来说不会是1:1(各500次),输出结果中,我们也看到了是494、506,当然这个执行结果也不是唯一的。
package com.szh.atomiclong; import java.util.concurrent.atomic.AtomicLong; /** * 使用原子变量类定义一个计数器 * 该计数器,在整个程序中都能使用,所以这个计数器可以设计为单例 */ public class Indicator { //构造方法私有化 private Indicator() {} //定义一个私有的本类静态对象 private static final Indicator INSTANCE=new Indicator(); //提供一个公共静态方法,获取该类唯一的实例 public static Indicator getInstance() { return INSTANCE; } //使用原子变量类 private final AtomicLong requestCount=new AtomicLong(0); //记录请求总数 private final AtomicLong successCount=new AtomicLong(0); //记录成功总数 private final AtomicLong failureCount=new AtomicLong(0); //记录失败总数 //有新的请求 public void newRequestReceive() { requestCount.getAndIncrement(); } //处理成功 public void requestReceiveSuccess() { successCount.getAndIncrement(); } //处理失败 public void requestReceiveFialure() { failureCount.getAndIncrement(); } //查看请求总数 public long getRequestCount() { return requestCount.get(); } //查看成功总数 public long getSuccessCount() { return successCount.get(); } //查看失败总数 public long getFailureCount() { return failureCount.get(); } }
package com.szh.atomiclong; import java.util.Random; /** * */ public class Test { public static void main(String[] args) { for (int i = 0; i < 1000; i++) { new Thread(new Runnable() { @Override public void run() { //每个线程就是一个请求,请求总数加1 Indicator.getInstance().newRequestReceive(); int num=new Random().nextInt(); if (num % 2 == 0) { //模拟偶数为请求成功 Indicator.getInstance().requestReceiveSuccess(); }else { //模拟奇数为请求失败 Indicator.getInstance().requestReceiveFialure(); } } }).start(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //打印请求数 System.out.println(Indicator.getInstance().getRequestCount()); //请求总数 System.out.println(Indicator.getInstance().getSuccessCount()); //成功总数 System.out.println(Indicator.getInstance().getFailureCount()); //失败总数 } }
案例1:AtomicIntegerArray中的常用方法。
package com.szh.atomicarray; import java.util.concurrent.atomic.AtomicIntegerArray; /** * AtomicIntegerArray 原子数组类的基本操作 */ public class Test01 { public static void main(String[] args) { //创建一个指定长度的原子数组 AtomicIntegerArray array=new AtomicIntegerArray(10); System.out.println(array); //返回指定位置的元素 System.out.println(array.get(0)); System.out.println(array.get(8)); //设置指定位置的元素 array.set(0,20); //在设置某个位置元素的新值时,返回该元素的旧值 System.out.println(array.getAndSet(1,66)); System.out.println(array); //将某个位置元素的值,加上一个数,返回该元素的新值 System.out.println(array.addAndGet(0,30)); System.out.println(array.addAndGet(1,4)); System.out.println(array); //CAS操作 //如果数组中索引为0的元素的值为50,则将其替换为新值1,返回true //反之,如果该索引的元素的值不是50,则不进行替换,返回false System.out.println(array.compareAndSet(0,50,1)); System.out.println(array); //自增/自减 System.out.println(array.incrementAndGet(0)); // ++i System.out.println(array.getAndIncrement(1)); // i++ System.out.println(array.decrementAndGet(2)); // --i System.out.println(array.getAndDecrement(3)); // i-- System.out.println(array); } }
案例2:定义一个原子数组,一个线程数组,10个线程分别将该原子数组中的每个元素自增1000次,所以最终得到的原子数组中,每个元素的值应该为:10*1000=10000。
package com.szh.atomicarray; import java.util.concurrent.atomic.AtomicIntegerArray; /** * 在多线程中使用 AtomicIntegerArray 原子数组类 */ public class Test02 { //定义一个原子数组 static AtomicIntegerArray array=new AtomicIntegerArray(10); public static void main(String[] args) { //定义一个线程数组 Thread[] threads=new Thread[10]; for (int i = 0; i < threads.length; i++) { threads[i]=new AddThread(); } //开启子线程 for (Thread thread : threads) { thread.start(); } //在主线程中查看自增之后的原子数组中每个元素的值,这里需要等待所有子线程执行完毕之后再查看 //所以这里可以使用join方法进行线程合并,把所有的子线程合并到当前主线程中 for (Thread thread : threads) { try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } //打印原子数组 System.out.println(array); } //定义一个线程类,在这个类中修改原子数组 static class AddThread extends Thread { @Override public void run() { //将原子数组中的每个元素自增1000次 for (int j=0;j<1000;j++) { for (int i = 0; i < array.length(); i++) { array.getAndIncrement(i % array.length()); } } } } }
AtomicIntegerFieldUpdater可以对原子整数字段进行更新,要求:
1. 字符必须使用 volatile 修饰,使线程之间可见。
2. 只能是实例变量,不能是静态变量,也不能使用 final 修饰。
在这个Java Bean中,我将age属性设置为volatile修饰,也就是说此时这个age对所有的线程都是可见的。
package com.szh.atominintegerfiledupdater; /** * */ public class User { int id; volatile int age; public User(int id, int age) { this.id = id; this.age = age; } @Override public String toString() { return "User{" + "id=" + id + ", age=" + age + '}'; } }
package com.szh.atominintegerfiledupdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; /** * */ public class SubThread extends Thread { //创建要更新的User对象 private User user; //创建 AtomicIntegerFieldUpdater 更新器 private AtomicIntegerFieldUpdater<User> updater=AtomicIntegerFieldUpdater.newUpdater(User.class,"age"); public SubThread(User user) { this.user=user; } @Override public void run() { //在子线程中对User对象的age字段进行自增操作 for (int i=0;i<10;i++) { System.out.println(updater.getAndIncrement(user)); } } }
上面这段代码,我定义了一个字段更新器,它的构造函数中传入两个值,第一个是User.class表示要更新User这个类(传入该类的字节码文件),第二个参数代表你要更新的字段名(必须是volatile修饰的)。
下面这段代码,main主线程中开启了10个子线程,每个线程执行自己的run方法时,都对age字段进行10次的自增操作,那么最终age字段一共会被自增10*10=100次,而age的初始值我们传入了20,最终age=20+100=120。
package com.szh.atominintegerfiledupdater; /** * */ public class Test { public static void main(String[] args) { User user=new User(1001,20); //开启10个线程 for (int i = 0; i < 10; i++) { new SubThread(user).start(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(user); } }
案例1:这个原子类属于引用型的,可以原子的读写一个对象。
package com.szh.atomicreference; import java.util.concurrent.atomic.AtomicReference; /** * 使用 AtomicReference 原子读写一个对象 */ public class Test01 { //创建一个AtomicReference对象 static AtomicReference<String> reference=new AtomicReference<>("abc"); public static void main(String[] args) throws InterruptedException { //开启10个线程,来修改字符串 for (int i = 0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { if (reference.compareAndSet("abc","def")) { System.out.println(Thread.currentThread().getName() + " 把字符串abc被修改为了def"); } } }).start(); } //开启10个线程,来修改字符串 for (int i = 0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { if (reference.compareAndSet("def","abc")) { System.out.println(Thread.currentThread().getName() + " 把字符串def还原为了abc"); } } }).start(); } Thread.sleep(1000); System.out.println(reference.get()); } }
案例2:演示AtomicReference中的ABA问题。
package com.szh.atomicreference; import java.util.concurrent.atomic.AtomicReference; /** * */ public class Test02 { private static AtomicReference<String> reference=new AtomicReference<>("abc"); public static void main(String[] args) throws InterruptedException { //创建一个线程,先把abc修改为def,再把def还原为abc Thread t1=new Thread(new Runnable() { @Override public void run() { reference.compareAndSet("abc","def"); System.out.println(Thread.currentThread().getName() + " ---> " + reference.get()); reference.compareAndSet("def","abc"); } }); Thread t2=new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(reference.compareAndSet("abc","xyz")); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(reference.get()); } }
这个原子类可以很好的规避CAS中的ABA问题。
package com.szh.atomicreference; import java.util.concurrent.atomic.AtomicStampedReference; /** * AtomicStampedReference 原子类可以解决 CAS 中的 ABA 问题 * 在 AtomicStampedReference 原子类中有一个整数标记值 stamp, * 每次执行 CAS 操作时,需要对比它的版本,即比较 stamp 的值 */ public class Test03 { //定义 AtomicStampedReference 引用操作"abc"字符串, 指定初始化版本号为 0 private static AtomicStampedReference<String> reference=new AtomicStampedReference<>("abc",0); public static void main(String[] args) throws InterruptedException { Thread t1=new Thread(new Runnable() { @Override public void run() { reference.compareAndSet("abc","def",reference.getStamp(),reference.getStamp()+1); System.out.println(Thread.currentThread().getName() + " ---> " + reference.getReference()); reference.compareAndSet("def","abc",reference.getStamp(),reference.getStamp()+1); } }); Thread t2=new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(reference.compareAndSet("abc","xyz",reference.getStamp(), reference.getStamp()+1)); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(reference.getReference()); } }