一. CAS指令与ABA问题
1. 解析CAS
CAS即compare and awap, 字面意思是比较并交换, 具体说就是将寄存器或者某个内存上的值A与另一个内存上的值V进行比较, 如果相同就将B与需要修改的值V进行交换, 并返回交换是否成功的结果.
我们假设内存中的原数据V, 旧的预期值A, 需要修改的新值B, 具体涉及下面三个操作.
比较A与V是否相等(比较).
如果比较相等, 将B写入V(交换), 不相等则不执行任何操作.
返回操作是否成功.
在上述交换过程中, 大多数情况下并不关心B后续的情况, 更关心的是V这个变量的情况, 这里的交换, 也可以近似理解成 “赋值”.
伪代码如下:
boolean CAS(A, B, V) { if (A == V) { V = B; return true; } return false; }
CAS最特殊的地方在于, 上述过程, 并非是通过一段代码实现的, 而是通过一条CPU指令完成的, 该指令是具有原子性的, 是线程安全的.
2. 基于CAS实现的原子类
Java标准库中提供了基于CAS所实现的 “原子类”, 这些类的类名以Atomic开头,针对常用的 int, long 等类型进行了封装, 它们可以基于CAS的方式进行修改, 并且保证线程安全性.
方法 | 解释 |
addAndGet(int delta) | i += delta |
decrementAndGet() | –i |
getAndDecrement() | i– |
incrementAndGet() | ++i |
getAndIncrement() | i++ |
这里举个例子, 典型的就是 AtomicInteger 类, 要实现多线程自增同一个变量, 其中的 getAndIncrement 相当于 i++ 操作.
import java.util.concurrent.atomic.AtomicInteger; public class TestDemo25 { // 编写代码, 基于 AtomicInteger 实现多线程自增同一个变量 public static void main(String[] args) throws InterruptedException { AtomicInteger count = new AtomicInteger(0); Thread t1 = new Thread(() -> { for (int i = 0; i < 50000; i++) { // 这个方法就相当于 count++ count.getAndIncrement(); } }); Thread t2 = new Thread(() -> { for (int i = 0; i < 50000; i++) { // 这个方法就相当于 count++ count.getAndIncrement(); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(count.get()); } }
执行结果:
上面的代码就是基于CAS实现的++操作, 不会存在线程安全问题, 这里既能够保证线程安全, 又要比 synchronized高效, synchronized会涉及到锁的竞争, 两个线程要相互等待, CAS不涉及到线程阻塞等待.
上面所使用的AtomicInteger类方法getAndIncrement实现的伪代码如下:
class AtomicInteger { private int value;//保存的值 //自增操作 public int getAndIncrement() { int oldValue = value; while ( CAS(value, oldValue, oldValue+1) != true) { oldValue = value; } return oldValue; } }
对于CAS指令, 它的执行逻辑就是先判断value的值与oldValue的值是否相同, 如果相同就把oldValue+1的值进写入到value中(相当于将value的值加1), oldValue可以理解成寄存器里的值, 相当于是先把value变量内存中的值读至到寄存器当中, 在单线程环境中oldValue的值与value的值一定是相同, 但多线程环境下就不一定了, 因为value的值随时都有可能被其他线程修改, 比如执行完读取value到寄存器, 线程就发生切换了, 另外一个线程, 也进行修改了value的值, 然后当这个线程再被调度回来后, 再进行CAS判定 ,就认为value和oldValue不相等了.
接着往下看while循环, 该循环使用CAS指令是否成功为判断条件, 如果CAS成功了则退出循环, 此时value的值已经加1, 最终返回oldValue; 如果CAS指令失败了; 这就说明有新线程提前对当前的value进行了++, value的值发生了改变, 这这时候就需要将新的value的值赋值给oldValue, 然后尝试重新进行CAS操作, 这样就能保证有几个线程操作, 那就自增几次, 从而也就保证了线程安全.
结合下图, 当两个线程现指令交错的情况, 理解基于CAS指令实现的多线程自增操作是如何保证线程安全的.
3. 基于CAS实现自旋锁
CAS的应用场景处了实现原子类, 还能够实现自旋锁, 伪代码如下:
//自旋锁对象 public class SpinLock { //记录当前锁对象被哪个线程占用,为null,表示锁对象未被占用 private Thread ownerv = null; public void lock(){ // 通过 CAS 看当前锁是否被某个线程持有. // 如果这个锁已经被别的线程持有,那么就自旋等待. // 如果这个锁没有被别的线程持有,那么就把 owner 设为当前尝试加锁的线程. while(!CAS(this.owner, null, Thread.currentThread())){ } } public void unlock (){ this.owner = null; } }
上面CAS与自旋锁的逻辑为了监测当前锁对象是否被线程占用, CAS监测当前的owner是否是null, 如果是null, 就进行交换, 也就是把当前线程的引用赋值给owner, 此时循环结束, 退出lock方法, 加锁就完成了.
如果当前锁, 已经被别的线程占用了, CAS就会发现, this.owner不是null, CAS就不会产生赋值, 也同时返回false, 循环继续执行, 并进行下次判定.
解锁的逻辑简单了, 将占用锁对象的线程(ownerv)置为null即可.
4. ABA问题
CAS指令操作的核心的检查value和oldValue是否相同, 如果相同, 就视为value中途没有被修改过, 所以进行下一步交换操作是没问题的, 在大部分情况下都能保证线程安全.
但这里有一种非常极端的情况, 这里说到的相同, value的值可能是没改过的, 还有可能是value的值被修改后又被改回到原来的值, 比如把value的值设为A的话, CAS判定value为A, 此时可能确实value始终是A, 也可能是value本来是A, 然后被改成了B, 最后又还原成了A.
上数说到的极端情况就是CAS中的ABA问题, 在一些极端场景下就会有bug存在, 比如下面的场景.
有一天, 滑稽老铁要到ATM机去取款, 假设当前滑稽的账户余额1000, 滑稽准备取500, 当滑稽老铁按下取款的这一瞬间, 机器卡了一下, 滑稽下意识就多按了一下, 如果考虑使用CAS的方式来扣款, 系统扣款的情况可能是下图所示:
正常情况下, 即使按下两次取款按钮最终的结果也是正常的, 但考虑一种极端情况, 如果在第一次CAS成功后的一瞬间, 滑稽老铁的朋友又给给滑稽转账了500, 导致第一次CAS扣款500后的余额从500又变回到了1000, 然后紧接着第二次CAS操作也会执行成功, 又成功扣款500, 最终余额变成了500, 这种结果显然是不合理的, 而正确的程序应该是第二次CAS仍然失败, 最终余额为1000元.
上述描述场景是极端的情况, 发生的概率是非常非常低的, 一方面, 恰好滑稽这边多按了几次产生多个扣款操作, 另一方面, 又赶巧在这个非常极限的时间内, 有人转账了一样的金额…
不过上述ABA问题在极端可能下造成的bug也是有办法解决的, 可以针对当前问题引入一个版本号, 假设初始版本号是1, 版本号只能增加不能减少, 每次修改版本号都+1, 然后进行CAS的时候, 就不是以金额值为基准了, 而是以版本号为基准, 在进行CAS操作之前, 都要对版本号进行验证, 如果版本号与之前加载的版本号不同, 则放弃此次CAS指令操作, 看下图理解, 这样最终的结果就是正确的了.
二. JUC中的常见类
Java中的JUC就是来自java.util.concurrent
包下的一些标准类或者接口, 放的都是并发编程(多线程)相关的组件.
1. Callable接口
常见的创建线程的方式有两种方式, 第一种方法是直接继承Thread类, 重写run方法, 第二种方法是实现Runnable接口, 然后还是要靠Thread类的构造器, 把Runnable传进去, 最终调用的就是Runnable的run方法。; 和Runnable类似, 我们还可以通过Callable接口描述一个任务配合FutureTask类来创建线程, 和Runnable不同的是, Callable接口配合FutureTask类所创建的线程其中的任务是可以带有返回值的, 而一开始提到的那两种方式任务是不支持带返回值的.
Callable接口中有一个call方法(返回值是泛型参数), 就相当于Runnable接口中的run方法(无返回值), FutureTask可用于异步获取执行结果或取消执行任务的场景, 通过传入Runnable或者Callable的任务给FutureTask, 直接调用其run方法或者放入线程池执行, 之后可以在外部通过FutureTask的get方法异步获取执行结果, 如果任务还没有执行完毕, get方法会阻塞直到任务返回结果.
理解FutureTask可以为想象去吃麻辣烫, 当餐点好后, 后厨就开始做了, 同时前台会给你一张 “小票”, 这个小票就是FutureTask, 后面我们可以随时凭这张小票去查看自己的这份麻辣烫做出来了没.
使用Thread类的构造器创建线程的时候, 传入的引用不能是Callable类型的, 而应该是FutrueTask类型, 因为构造器中传入的任务类型需要是一个Runnable类,Callable与Runnable是没有直接关系的, 但FutrueTask类实现了Runnable类, 所以要想使用Callable创建线程, 我们就需要先把实现Callable接口的对象引用传给FutrueTask类的实例对象, 再将FutrueTask实例传入线程构造器中.
总结一下就是, 我们可以用Callable用来描述任务, FutureTask类用来管理Callable任务的执行结果.
比如, 我们使用Callable来计算 1 + 2 + 3 + … + 1000 的值, 并通过返回值的方式获取执行结果.
代码示例:
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class TestDemo26 { public static void main(String[] args) throws ExecutionException, InterruptedException { // 使用 Callable 来计算 1 + 2 + 3 + ... + 1000 Callable<Integer> callable = new Callable<Integer>() { @Override public Integer call() throws Exception { int sum = 0; for (int i = 0; i <= 1000; i++) { sum += i; } return sum; } }; FutureTask<Integer> futureTask = new FutureTask<>(callable); Thread t = new Thread(futureTask); t.start(); //获取执行结果 Integer sum = futureTask.get(); System.out.println(sum); } }
执行结果:
2. ReentrantLock类(可重入锁)
ReentrantLock是除了synchronized外标准库给我们提供的另一种可重入锁, 与synchronized不同的是, ReentrantLock是通过lock方法加锁,unlock方法解锁, 相比于synchronized直接基于代码块的方式来加锁解锁更加传统.
正是由于加锁解锁两个操作是分开的, 所以代码的写法上需要格外注意, 一方面lock后如果之后的工作代码比较长久容易忘记去unlock从而造成死锁, 另一方面加锁后解锁前中间的代码万一出了问题(比如直接return或者出现异常), 都可能导致不能顺利执行unlock造成死锁.
ReentrantLock reentrantLock = new ReentrantLock(); reentrantLock.lock(); // working reentrantLock.unlock();
所以使用ReentrantLock类时, 一般要搭配finally使用, 将unlock放入到finally保证unlock一定会执行.
ReentrantLock reentrantLock = new ReentrantLock(); reentrantLock.lock(); try { // working } finally { reentrantLock.unlock(); } reentrantLock.unlock();
🎯ReentrantLock相较于synchronized的优势:
ReentrantKLock提供了公平锁版本的实现, 而synchronized只实现了非公平锁, ReentrantKLock默认是非公平锁, 而公平锁版本只需要指定fair参数为true即可..
对于synchronized来说, 提供的加锁操作就是"死等", 也就是说如果锁已经被占用, 只要锁没释放就需要一直等下去,而ReentrantLock提供了更灵活的等待方式: tryLeock.
方法 | 解释 |
boolean trylock() | 无参数, 能加锁就加, 加不上就放弃 |
boolean trylock(long timeout, TimeUnit unit) | 有参数, 超过指定时间, 加不上锁就放弃 |
ReentrantLock提供了一个更方便的等待通知机制, synchronized搭配的是wait, notify, 当我们notify的时候是随即唤醒一个wait状态的线程; 而ReentrantLock搭配一个Condition类, 进行唤醒的时候可以唤醒指定线程.
3. Semaphore类(信号量)
Java中信号量(Semaphore)是把操作系统原生的信号量封装了一下, 本质就是一个计数器, 描述了 “可用资源的个数”,主要涉及到两个操作
P操作: 申请一个可用资源, 计数器 -1.
V操作: 释放一个可用资源, 计数器 +1.
如果计数器为0了, 继续Р操作, 就会出现阻塞等待的情况.
🍂举个例子来理解信号量:
会开车的应该经常会碰到, 停车, 停车场门口有一个灯牌, 会显示停车位还剩余多少个, 每进去一辆车, 显示的停车位数量就-1, 就相当于进行了一次P操作, 每出去一辆车, 显示的停车位数量就+1, 就相当于进行了一次V操作; 而当停车场的剩余车位为0时, 显示的停车位数量就为0了, 此时如果还有车想停, 要么在这里等, 要么就去其他停车场.
🎯Semaphore类的常用方法:
构造方法
public Semaphore(int permits) | 构造可用资源为permits个的信号量对象 |
public Semaphore(int permits, boolean fair) | 构造可用资源为permits个的信号量对象, 并指定信号量是否是公平性质的 |
PV方法
Semaphore的PV操作中的加减计数器操作都是原子的, 可以在多线程环境下直接使用.
public void acquire() throws InterruptedException | 申请可用资源 |
public void release() | 释放可用资源 |
代码示例:
创建Semaphore示例, 初始化为4, 表示有4个可用资源.
acquire方法表示申请资源(P操作), release方法表示释放资源(V操作).
创建20个线程, 每个线程都尝试申请资源, sleep1秒之后, 释放资源. 观察程序的执行效果.
import java.util.concurrent.Semaphore; public class Test { public static void main(String[] args) { Semaphore semaphore = new Semaphore(4); Runnable runnable = new Runnable() { @Override public void run() { try { System.out.println("申请资源"); semaphore.acquire(); System.out.println("我获取到资源了"); Thread.sleep(1000); System.out.println("我释放资源了"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }; for (int i = 0; i < 20; i++) { Thread t = new Thread(runnable); t.start(); } } }
执行结果:
考虑这里一个场景, 假设有一个计数器初始值为1的信号量, 针对这个信号量的值, 就只有1和0两种取值, 因为信号量不能是负的,
执行一次Р操作, 1->0
执行一次V操作, 0->1
如果已经进行一次Р操作了, 继续进行Р操作, 就会阻塞等待, 这是不是和锁的功效有点类似呢?
锁就可以视为 “二元信号量”, 可用资源就1个, 计数器的取值非0即1, 可以说, 锁是信号量的一种特殊情况, 信号量就把锁推广到了一般情况, 描述了可用资源更多的时候是如何处理的.
所以说, 计数器初始值为1的信号量就可以当成锁来使用, 这里我们编写一个代码实现两个线程增加同一个变量, 使用Semphore来控制线程安全.
import java.util.concurrent.Semaphore; public class TestDemo27 { // 编写代码实现两个线程增加同一个变量, 使用 Semphore 来控制线程安全. public static int count = 0; public static void main(String[] args) throws InterruptedException { Semaphore semaphore = new Semaphore(1); Thread t1 = new Thread(() -> { for (int i = 0; i < 50000; i++) { try { semaphore.acquire(); count++; semaphore.release(); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); Thread t2 = new Thread(() -> { for (int i = 0; i < 50000; i++) { try { semaphore.acquire(); count++; semaphore.release(); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(count); } }
执行结果:
4. CountDownLatch同步工具类
CountDownLatch是一个同步工具类, 用来协调多个线程之间的同步, 或者说起到线程之间的通信(而不是用作互斥的作用).
CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后, 再继续执行; 使用一个计数器进行实现, 计数器初始值为线程的数量; 当每一个线程完成自己任务后, 计数器的值就会-1, 当计数器的值为0时, 表示所有的线程都已经完成一些任务, 然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务.
想象一个场景进行理解, 假设有一场跑步比赛, 比赛开始时间是明确的, 裁判的发令枪响了比赛就开始了, 但结束时间是不确定的, 只有当所有的选手都冲过了终点线才算结束, 这里的运动员就相当于线程, 而CountDownLatch类用判断什么时候最后一个远动员冲过终点线, 即这些线程在什么时候可以全部执行结束.
🎯CountDownLatch类常用方法:
构造方法
public CountDownLatch(int count) | 构造实例对象, count表示CountDownLatch对象中计数器的值 |
普通方法
public void await() throws InterruptedException | 使所处的线程进入阻塞等待, 直到计数器的值清零 |
public void countDown() | 将计数器的值减1 |
public long getCount() | 获取计数器最初的值 |
代码示例:
10个选手依次就位, 哨声响才同时出发, 所有选手都通过终点, 比赛结束.
构造CountDownLatch实例, 初始化10表示有10个任务需要完成(10个选手参加比赛).
每个任务执行完毕, 都调用latch.countDown(), 在CountDownLatch内部的计数器同时自减(有一个选手冲过了终点线).
主线程中使用 latch.await(), 阻塞等待所有任务执行完毕, 相当于计数器为0了(所有选手都冲过了终点线比赛结束).
import java.util.concurrent.CountDownLatch; public class Test2 { public static void main(String[] args) throws InterruptedException { //构造方法的参数表示有几个选手 CountDownLatch latch = new CountDownLatch(10); for (int i = 0; i < 10; i++) { Thread t = new Thread(() -> { try { Thread.sleep(3000); System.out.println(Thread.currentThread().getName() + " 到达终点"); latch.countDown(); //调用countDown的次数和个数一致 } catch (InterruptedException e) { e.printStackTrace(); } }); t.start(); } //裁判要等所有线程到达 //当这些线程没有执行完的时候,await就会阻塞,所有线程执行完了,await 才返回 latch.await(); System.out.println("比赛结束"); } }
执行结果:
实际开发中这样的场景也是存在的, 比如多线程下载(迅雷, steam等下载器), 当我们下载一个比较大的文件资源(电影), 通过多线程下载就可以提高下载速度, 把一个大的文件拆成多个部分安排多个线程下载, 每个线程负责下载其中的一个部分, 等到是所有的线程都完成自己的下载, 才算把整个文件下载完, 这里就可以用到CountDownLatch来判断文件整体是否下载完毕, 多线程下载是充分利用了带宽(下载是IO操作, 和CPU关系不大).