CAS
CAS介绍
CAS的全称是:比较并交换(Compare And Swap)。在CAS中,有这样三个值:
- V:变量var,也即AtomicInteger类当中被声明为volatile 的value
- E:期望值(expected)
- U:新值(update)
其实CAS实现方法的入参处,还有一个值叫做valueOffset,即V的内存地址valueOffset
比较并交换的过程如下:
判断V是否等于E,如果等于,将V的值设置为U;如果不等,说明已经有其它线程更新了变量v,则当前线程放弃更新,什么都不做。
所以这里的期望值E本质上指的是“旧值”。
我们以一个简单的例子来解释这个过程: 如果有一个多个线程共享的变量i原本等于5,我现在在线程A中,想把它设置为新的值6。我们使用CAS来做这个事情: cas成功:首先我们用i去与5对比,发现它等于5,说明没有被其它线程改过,那我就把它设置为新的值6,此次CAS成功,i的值被设置成了6; cas失败:如果不等于5,说明i被其它线程改过了(比如现在i的值为2),那么我就什么也不做,此次CAS失败,i的值仍然为2。
在这个例子中,i就是V,5就是E,6就是U。
那有没有可能我在判断了i为5之后,正准备更新它的新值的时候,被其它线程更改了i的值呢?
不会的。因为CAS是一种原子操作,它是一种系统原语,是一条CPU的原子指令,从CPU层面保证它的原子性。 当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败,但失败的线程并不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。
自旋锁的基础就是CAS
CAS是实现自旋锁的基础,CAS利用CPU指令保证了操作的原子性,以达到加锁的效果。
至于自旋呢,看字面意思也很明白,自己旋转,是指尝试获取锁的线程不会立即阻塞而是采用循环的方式去尝试获取锁,当线程发现锁被占用时,会不断循环判断锁的状态,直到获取。
●这样的好处是减少线程上下文切换的消耗,
●缺点是循环会消耗CPU。
手写自旋锁:
package com.bilibili.juc.cas; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** * 题目:实现一个自旋锁,复习CAS思想 * 自旋锁好处:循环比较获取没有类似wait的阻塞。 * * 通过CAS操作完成自旋锁,A线程先进来调用myLock方法自己持有锁5秒钟,B随后进来后发现 * 当前有线程持有锁,所以只能通过自旋等待,直到A释放锁后B随后抢到。 */ public class SpinLockDemo { AtomicReference<Thread> atomicReference = new AtomicReference<>(); public void lock() { Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName()+"\t"+"----come in"); while (!atomicReference.compareAndSet(null, thread)) { } } public void unLock() { Thread thread = Thread.currentThread(); atomicReference.compareAndSet(thread,null); System.out.println(Thread.currentThread().getName()+"\t"+"----task over,unLock..."); } public static void main(String[] args) { SpinLockDemo spinLockDemo = new SpinLockDemo(); new Thread(() -> { spinLockDemo.lock(); //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } spinLockDemo.unLock(); },"A").start(); //暂停500毫秒,线程A先于B启动 try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { spinLockDemo.lock(); spinLockDemo.unLock(); },"B").start(); } }
乐观锁|悲观锁与CAS的关系
锁可以从不同的角度分类。其中,乐观锁和悲观锁是一种分类方式。
悲观锁:
悲观锁就是我们常说的锁。对于悲观锁来说,它总是认为每次访问共享资源时会发生冲突,所以必须对每次数据操作加上锁,以保证临界区的程序同一时间只能有一个线程在执行。
乐观锁:
乐观锁又称为“无锁”,顾名思义,它是乐观派。乐观锁总是假设对共享资源的访问没有冲突,线程可以不停地执行,无需加锁也无需等待。而一旦多个线程发生冲突,乐观锁通常是使用一种称为CAS的技术来保证线程执行的安全性。由于无锁操作中没有锁的存在,因此不可能出现死锁的情况,也就是说乐观锁天生免疫死锁。
悲观锁和乐观锁的区别:
乐观锁多用于“读多写少“的环境,避免频繁加锁影响性能;而悲观锁多用于”写多读少“的环境,避免频繁失败和重试影响性能。
CAS存在的问题
自旋
CAS多与自旋结合。如果自旋CAS长时间不成功,会占用大量的CPU资源。
- 这样的好处是减少线程上下文切换的消耗
- 缺点是循环会消耗CPU
解决思路是让JVM支持处理器提供的pause指令。pause指令能让自旋失败时cpu睡眠一小段时间再继续自旋,从而使得读操作的频率低很多,为解决内存顺序冲突而导致的CPU流水线重排的代价也会小很多。
ABA问题
所谓ABA问题: 比如说一个线程1从内存位置V中取出A 这时候另一个线程2也从内存中取出A,并且线程2进行了一些操作将值变成了B,然后线程2又将V位置的数据变成A放回去 这时候线程1进行CAS操作发现内存中仍然是A,预期OK,然后线程1操作成功
尽管线程1的CAS操作成功,但是不代表这个过程就是没有问题的。
段子:老刘出差媳妇在家,出差期间老刘的隔壁老王在练腰,竞争对手在磨刀,问老刘回家后其媳妇有没有发生变化?
import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class Test { static AtomicInteger atomicInteger = new AtomicInteger(100); public static void main(String[] args) { new Thread(() -> { atomicInteger.compareAndSet(100, 101); try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } atomicInteger.compareAndSet(101, 100); }, "t1").start(); new Thread(() -> { try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(atomicInteger.compareAndSet(100, 2022) + "\t" + atomicInteger.get()); }, "t2").start(); } }
ABA问题的解决思路是在变量前面追加上版本号|时间戳|戳记流水,择其一。
从JDK 1.5开始,JDK的atomic包里提供了一个类AtomicStampedReference类来解决ABA问题。
import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicStampedReference; public class Test { static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference<>(100, 1); public static void main(String[] args) { new Thread(() -> { int stamp = stampedReference.getStamp(); System.out.println(Thread.currentThread().getName() + "\t" + "首次版本号:" + stamp); //暂停500毫秒,保证后面的t4线程初始化拿到的版本号和我一样 try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } stampedReference.compareAndSet(100, 101, stampedReference.getStamp(), stampedReference.getStamp() + 1); System.out.println(Thread.currentThread().getName() + "\t" + "2次流水号:" + stampedReference.getStamp()); stampedReference.compareAndSet(101, 100, stampedReference.getStamp(), stampedReference.getStamp() + 1); System.out.println(Thread.currentThread().getName() + "\t" + "3次流水号:" + stampedReference.getStamp()); }, "t3").start(); new Thread(() -> { int stamp = stampedReference.getStamp(); System.out.println(Thread.currentThread().getName() + "\t" + "首次版本号:" + stamp); //暂停1秒钟线程,等待上面的t3线程,发生了ABA问题 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } boolean b = stampedReference.compareAndSet(100, 2022, stamp, stamp + 1); System.out.println(b + "\t" + stampedReference.getReference() + "\t" + stampedReference.getStamp()); }, "t4").start(); } }
这个类的compareAndSet方法的作用,不光先检查当前引用是否等于预期引用,并且会检查当前标志是否等于预期标志。
原子类
Atomic的理念
场景一:原子性破坏
synchronized保证i++在多线程情况下的安全?
//volatile关键字保证可见性 volatile int number = 0; public int getNumber() { return number; } //synchronized关键字保证多线程情况下i++的原子性 public synchronized void setNumber() { number++; }
但是synchronized比较重,性能比较慢,代码写起来也复杂
由此Java引入了原子类型包,原子类底层是CAS,无需手动加锁即可实现锁的效果,相当于乐观锁
AtomicInteger atomicInteger = new AtomicInteger(); public int getNumber() { return atomicInteger.get(); } public void setNumber() { atomicInteger.getAndIncrement(); }
场景二:主线程抢占
使用AtomicInteger,多线程对i++累加
public static final int SIZE = 50; public static void main(String[] args) throws InterruptedException { MyNumber myNumber = new MyNumber(); for (int i = 1; i <= SIZE; i++) { new Thread(() -> { for (int j = 1; j <= 1000; j++) { myNumber.addPlusPlus(); } }, String.valueOf(i)).start(); } //暂停几秒钟线程,等待上面50个线程全部计算完成后,再去获得最终值,否则主线程会抢占,导致最终打印的i不是50000 //try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t" + "result: " + myNumber.atomicInteger.get()); }
但这种手动加睡眠时间的方式,很挫try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
public static final int SIZE = 50; public static void main(String[] args) throws InterruptedException { MyNumber myNumber = new MyNumber(); CountDownLatch countDownLatch = new CountDownLatch(SIZE); for (int i = 1; i <= SIZE; i++) { new Thread(() -> { try { for (int j = 1; j <= 1000; j++) { myNumber.addPlusPlus(); } } finally { //countDown() countDownLatch.countDown(); } }, String.valueOf(i)).start(); } //await() countDownLatch.await(); System.out.println(Thread.currentThread().getName() + "\t" + "result: " + myNumber.atomicInteger.get()); }