在JDK 1.5之后,java api中提供了java.util.concurrent包,简称JUC包。这个包定义了很多我们非常熟悉的工具类,比如原子类AtomicXX,线程池executors、信号量semaphore、阻塞队列、同步器等。日常并发编程要用的熟面孔基本都在这里。
首先,Atomic包,原子操作类,提供了用法简单、性能高效、最重要是线程安全的更新一个变量。支持整型、长整型、布尔、double、数组、以及对象的属性原子修改,支持种类非常丰富。
之前的文章《JAVA并发编程volatile核心原理》说过,volatile只是解决了多线程的可见性和有序性问题,原子性问题并没有解决。 在这里volatile+Atomic原子类可以完美实现多线程共享变量的安全修改。
1、Atomic原子类并发修改共享变量demo
package lading.java.mutithread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; /** * 多线程并发下,使用原子类+volatile进行操作,实现共享变量修改并发安全 * 10个线程并发对共享变量count进行求和,每个线程求和100次,最后结果是1000 */ public class Demo005AtomicCount { public static volatile AtomicInteger count = new AtomicInteger(0); public static void main(String[] args) { int threadNum = 10; ExecutorService threadPool = Executors.newFixedThreadPool(threadNum); for (int i = 0; i < threadNum; i++) { //10个线程并发对共享变量count进行求和,每个线程求和100次 threadPool.submit(() -> { System.out.println(Thread.currentThread().getName() + "开始对count求和100次"); for (int j = 0; j < 100; j++) { count.getAndIncrement(); } }); } threadPool.shutdown(); System.out.println("最后count的值:" + count.get()); } }
结果,刚好就是1000.
2、说一下CAS原理
CAS 英文原名compare AND swap(比较再置换)。在JUC包中,大量应用了CAS的原理实现,比如AQS ,concurrentHashMap都有应用CAS技术。可以说,synchronized、volatile、CAS就是JAVA并发编程里的基石。
CAS本质是一条CPU指令,fun(address,oldValue,newValue),其中Address就是本次操作要读取旧值以及修改为新值的内存地址,oldValue就是该内存地址之前的旧值,newValue就是本次操心希望写入的新值。
核心原理:判断内存地址address当前的值是否与oldValue相同,如果相同就更新该内存地址的值为newValue。
先看一下源码,这个AtomicInteger +1源码:
public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; } 然后compareAndSwapInt的源码如下: public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
这里this.compareAndSwapInt(var1, var2, var5, var5 + var4));可以看到其实是4个变量,实际就是我们常说的三个变量,因为Object var1表示这个对象内存地址,var2只是偏移量。
这个偏移量有什么用呢?继续看unsafe c++的源码,发现,其实和我们之前讲的JVM内存模型是一样的,一个对象有自己的地址,对象在内存中会占据一段内存区域,这一片区域会有对象的各种信息,比如锁状态,对象头信息,最重要的是还有我们这个对象的值。那如何找到这个值的内存地址呢?
这时候就是需要这个long var2 偏移量。通过这个var2才可以读到当前变量的旧值old。然后拿这个old和预期的oldValue去比对,相等了就把newValue更新到值的内存地址。
3、CAS有什么缺陷吗
在2的核心原理源码看到,很明显,如果发现从内存地址里值不是预期的oldValue,那就陷入了死循环。CAS就是一个while循环判断内存地址值是否等于预期值,不等于就继续循环。
3.1 CAS一直不成功,就会导致CPU自旋时间过长,CPU开销过大。
3.2 A->B->A问题,俗称ABA问题。
举个例子,有个变量name="A",有个线程想把name从A修改为C。但是很不幸在次过程中被别的线程修改为B后,又再修改为A,它才能执行。再具体一点就是:有个人存款有100块,想去取出来,但是取款机显示余额是0,无法取现;过了一会,发现余额是100,这才能取现。这个诡异的过程就是ABA的问题,CAS对变量预期旧值的循环变化是无感的。
4、如何解决CAS的ABA问题
我们先看一下ABA的demo。
package lading.java.mutithread; import java.util.concurrent.atomic.AtomicInteger; /** * CAS ABA问题复现 * count 原值100 */ public class Demo006CasBug { public static volatile AtomicInteger count = new AtomicInteger(100); public static void main(String[] args) throws InterruptedException { //线程1先修改为200,然后马上修改为100 Thread thread1 = new Thread(() -> { count.compareAndSet(100, 200); count.compareAndSet(200, 100); System.out.println(Thread.currentThread().getName() + "完成对count的从100修改为200,然后再修改为100"); }); Thread thread2 = new Thread(() -> { count.compareAndSet(100, 300); System.out.println(Thread.currentThread().getName() + "对count的从100修改为" + count.get()); }); thread1.start(); thread2.start(); thread1.join(); thread2.join(); } }
结果是线程1无感的修改为300,实际上期间线程0已经修改了2次。
JDK提供了AtomicStampedReference来解决。解决demo:
package lading.java.mutithread; import java.util.concurrent.atomic.AtomicStampedReference; /** * 解决CAS ABA问题 * 用AtomicStampedReference,给原子类增加一个时间戳6666,这样每次修改需要增加判断预期的时间戳 * 类似数据库mysql更新一个记录,update set col1='' where col2=oldVal and timestamp=oldVal * 在cas基础上增加一个数据时间戳,确保数据未更新修改过。 */ public class Demo007CasBugFix { public static AtomicStampedReference<Integer> count = new AtomicStampedReference<>(100, 6666); public static void main(String[] args) { //线程1先修改为101,然后马上修改为100 Thread thread1 = new Thread(() -> { System.out.println(Thread.currentThread().getName() + "第一次修改前时间戳版本号:" + count.getStamp()); boolean isSuccess1 = count.compareAndSet(100, 101, count.getStamp(), count.getStamp() + 1); System.out.println("第一次修改:" + (isSuccess1 ? "成功" : "失败") + ",当前版本号为:" + count.getStamp() + ",count值为:" + count.getReference()); System.out.println(Thread.currentThread().getName() + "第二次修改前时间戳版本号:" + count.getStamp() + ",count值为:" + count.getReference()); boolean isSuccess2 = count.compareAndSet(101, 100, count.getStamp(), count.getStamp() + 1); System.out.println("第二次修改:" + (isSuccess2 ? "成功" : "失败")); //如果线程1完成了【先修改为200,然后马上修改为100】 if (isSuccess1 && isSuccess2) { System.out.println(Thread.currentThread().getName() + "成功完成A->B->A的修改"); } }, "线程1"); Thread thread2 = new Thread(() -> { int versionNum = count.getStamp(); System.out.println(Thread.currentThread().getName() + "第一次修改前时间戳版本号:" + versionNum); try { Thread.sleep(5); } catch (InterruptedException e) { throw new RuntimeException(e); } boolean isSuccess = count.compareAndSet(100, 300, versionNum, versionNum + 1); if (isSuccess) { System.out.println(Thread.currentThread().getName() + "修改成功,对count的从100修改为300"); } else { System.out.println(Thread.currentThread().getName() + "修改失败,count值仍然为:" + count.getReference()); } }, "线程2"); thread1.start(); thread2.start(); } }
结果,因为时间戳已经被修改了,线程1就没法修改cout,避免了ABA问题: