Java多线程之CAS中的ABA问题与JUC的常见类

简介: Java多线程之CAS中的ABA问题与JUC的常见类

一. CAS指令与ABA问题

1. 解析CAS

CAS即compare and awap, 字面意思是比较并交换, 具体说就是将寄存器或者某个内存上的值A与另一个内存上的值V进行比较, 如果相同就将B与需要修改的值V进行交换, 并返回交换是否成功的结果.

我们假设内存中的原数据V, 旧的预期值A, 需要修改的新值B, 具体涉及下面三个操作.

73d8c9be8b2a4960a39693770de0ac9a.png

比较A与V是否相等(比较).

如果比较相等, 将B写入V(交换), 不相等则不执行任何操作.

返回操作是否成功.

在上述交换过程中, 大多数情况下并不关心B后续的情况, 更关心的是V这个变量的情况, 这里的交换, 也可以近似理解成 “赋值”.


伪代码如下:

boolean CAS(A, B, V) {
  if (A == V) {
    V = B;
    return true;
  }
  return false;
}

CAS最特殊的地方在于, 上述过程, 并非是通过一段代码实现的, 而是通过一条CPU指令完成的, 该指令是具有原子性的, 是线程安全的.


2. 基于CAS实现的原子类

Java标准库中提供了基于CAS所实现的 “原子类”, 这些类的类名以Atomic开头,针对常用的 int, long 等类型进行了封装, 它们可以基于CAS的方式进行修改, 并且保证线程安全性.

73d8c9be8b2a4960a39693770de0ac9a.png

方法 解释
addAndGet(int delta) i += delta
decrementAndGet() –i
getAndDecrement() i–
incrementAndGet() ++i
getAndIncrement() i++

这里举个例子, 典型的就是 AtomicInteger 类, 要实现多线程自增同一个变量, 其中的 getAndIncrement 相当于 i++ 操作.

import java.util.concurrent.atomic.AtomicInteger;
public class TestDemo25 {
    // 编写代码, 基于 AtomicInteger 实现多线程自增同一个变量
    public static void main(String[] args) throws InterruptedException {
        AtomicInteger count = new AtomicInteger(0);
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 50000; i++) {
                // 这个方法就相当于 count++
                count.getAndIncrement();
            }
        });
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 50000; i++) {
                // 这个方法就相当于 count++
                count.getAndIncrement();
            }
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(count.get());
    }
}

执行结果:

73d8c9be8b2a4960a39693770de0ac9a.png

上面的代码就是基于CAS实现的++操作, 不会存在线程安全问题, 这里既能够保证线程安全, 又要比 synchronized高效, synchronized会涉及到锁的竞争, 两个线程要相互等待, CAS不涉及到线程阻塞等待.

上面所使用的AtomicInteger类方法getAndIncrement实现的伪代码如下:

class AtomicInteger {
    private int value;//保存的值
    //自增操作
    public int getAndIncrement() {
        int oldValue = value;
        while ( CAS(value, oldValue, oldValue+1) != true) {
            oldValue = value;
        }
        return oldValue;
    }
}

对于CAS指令, 它的执行逻辑就是先判断value的值与oldValue的值是否相同, 如果相同就把oldValue+1的值进写入到value中(相当于将value的值加1), oldValue可以理解成寄存器里的值, 相当于是先把value变量内存中的值读至到寄存器当中, 在单线程环境中oldValue的值与value的值一定是相同, 但多线程环境下就不一定了, 因为value的值随时都有可能被其他线程修改, 比如执行完读取value到寄存器, 线程就发生切换了, 另外一个线程, 也进行修改了value的值, 然后当这个线程再被调度回来后, 再进行CAS判定 ,就认为value和oldValue不相等了.


接着往下看while循环, 该循环使用CAS指令是否成功为判断条件, 如果CAS成功了则退出循环, 此时value的值已经加1, 最终返回oldValue; 如果CAS指令失败了; 这就说明有新线程提前对当前的value进行了++, value的值发生了改变, 这这时候就需要将新的value的值赋值给oldValue, 然后尝试重新进行CAS操作, 这样就能保证有几个线程操作, 那就自增几次, 从而也就保证了线程安全.


结合下图, 当两个线程现指令交错的情况, 理解基于CAS指令实现的多线程自增操作是如何保证线程安全的.

73d8c9be8b2a4960a39693770de0ac9a.png

3. 基于CAS实现自旋锁

CAS的应用场景处了实现原子类, 还能够实现自旋锁, 伪代码如下:

//自旋锁对象
public class SpinLock {
    //记录当前锁对象被哪个线程占用,为null,表示锁对象未被占用
    private Thread ownerv = null;
    public void lock(){
        // 通过 CAS 看当前锁是否被某个线程持有. 
        // 如果这个锁已经被别的线程持有,那么就自旋等待. 
        // 如果这个锁没有被别的线程持有,那么就把 owner 设为当前尝试加锁的线程. 
        while(!CAS(this.owner, null, Thread.currentThread())){
        }
   }
    public void unlock (){
        this.owner = null;
   }
}

上面CAS与自旋锁的逻辑为了监测当前锁对象是否被线程占用, CAS监测当前的owner是否是null, 如果是null, 就进行交换, 也就是把当前线程的引用赋值给owner, 此时循环结束, 退出lock方法, 加锁就完成了.


如果当前锁, 已经被别的线程占用了, CAS就会发现, this.owner不是null, CAS就不会产生赋值, 也同时返回false, 循环继续执行, 并进行下次判定.


解锁的逻辑简单了, 将占用锁对象的线程(ownerv)置为null即可.

4. ABA问题

CAS指令操作的核心的检查value和oldValue是否相同, 如果相同, 就视为value中途没有被修改过, 所以进行下一步交换操作是没问题的, 在大部分情况下都能保证线程安全.


但这里有一种非常极端的情况, 这里说到的相同, value的值可能是没改过的, 还有可能是value的值被修改后又被改回到原来的值, 比如把value的值设为A的话, CAS判定value为A, 此时可能确实value始终是A, 也可能是value本来是A, 然后被改成了B, 最后又还原成了A.


上数说到的极端情况就是CAS中的ABA问题, 在一些极端场景下就会有bug存在, 比如下面的场景.


有一天, 滑稽老铁要到ATM机去取款, 假设当前滑稽的账户余额1000, 滑稽准备取500, 当滑稽老铁按下取款的这一瞬间, 机器卡了一下, 滑稽下意识就多按了一下, 如果考虑使用CAS的方式来扣款, 系统扣款的情况可能是下图所示:

73d8c9be8b2a4960a39693770de0ac9a.png

正常情况下, 即使按下两次取款按钮最终的结果也是正常的, 但考虑一种极端情况, 如果在第一次CAS成功后的一瞬间, 滑稽老铁的朋友又给给滑稽转账了500, 导致第一次CAS扣款500后的余额从500又变回到了1000, 然后紧接着第二次CAS操作也会执行成功, 又成功扣款500, 最终余额变成了500, 这种结果显然是不合理的, 而正确的程序应该是第二次CAS仍然失败, 最终余额为1000元.

73d8c9be8b2a4960a39693770de0ac9a.png

上述描述场景是极端的情况, 发生的概率是非常非常低的, 一方面, 恰好滑稽这边多按了几次产生多个扣款操作, 另一方面, 又赶巧在这个非常极限的时间内, 有人转账了一样的金额…


不过上述ABA问题在极端可能下造成的bug也是有办法解决的, 可以针对当前问题引入一个版本号, 假设初始版本号是1, 版本号只能增加不能减少, 每次修改版本号都+1, 然后进行CAS的时候, 就不是以金额值为基准了, 而是以版本号为基准, 在进行CAS操作之前, 都要对版本号进行验证, 如果版本号与之前加载的版本号不同, 则放弃此次CAS指令操作, 看下图理解, 这样最终的结果就是正确的了.73d8c9be8b2a4960a39693770de0ac9a.png


二. JUC中的常见类

Java中的JUC就是来自java.util.concurrent包下的一些标准类或者接口, 放的都是并发编程(多线程)相关的组件.

1. Callable接口

常见的创建线程的方式有两种方式, 第一种方法是直接继承Thread类, 重写run方法, 第二种方法是实现Runnable接口, 然后还是要靠Thread类的构造器, 把Runnable传进去, 最终调用的就是Runnable的run方法。; 和Runnable类似, 我们还可以通过Callable接口描述一个任务配合FutureTask类来创建线程, 和Runnable不同的是, Callable接口配合FutureTask类所创建的线程其中的任务是可以带有返回值的, 而一开始提到的那两种方式任务是不支持带返回值的.


Callable接口中有一个call方法(返回值是泛型参数), 就相当于Runnable接口中的run方法(无返回值), FutureTask可用于异步获取执行结果或取消执行任务的场景, 通过传入Runnable或者Callable的任务给FutureTask, 直接调用其run方法或者放入线程池执行, 之后可以在外部通过FutureTask的get方法异步获取执行结果, 如果任务还没有执行完毕, get方法会阻塞直到任务返回结果.


理解FutureTask可以为想象去吃麻辣烫, 当餐点好后, 后厨就开始做了, 同时前台会给你一张 “小票”, 这个小票就是FutureTask, 后面我们可以随时凭这张小票去查看自己的这份麻辣烫做出来了没.


使用Thread类的构造器创建线程的时候, 传入的引用不能是Callable类型的, 而应该是FutrueTask类型, 因为构造器中传入的任务类型需要是一个Runnable类,Callable与Runnable是没有直接关系的, 但FutrueTask类实现了Runnable类, 所以要想使用Callable创建线程, 我们就需要先把实现Callable接口的对象引用传给FutrueTask类的实例对象, 再将FutrueTask实例传入线程构造器中.

73d8c9be8b2a4960a39693770de0ac9a.png

总结一下就是, 我们可以用Callable用来描述任务, FutureTask类用来管理Callable任务的执行结果.

比如, 我们使用Callable来计算 1 + 2 + 3 + … + 1000 的值, 并通过返回值的方式获取执行结果.

代码示例:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class TestDemo26 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 使用 Callable 来计算 1 + 2 + 3 + ... + 1000
        Callable<Integer> callable = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                int sum = 0;
                for (int i = 0; i <= 1000; i++) {
                    sum += i;
                }
                return sum;
            }
        };
        FutureTask<Integer> futureTask = new FutureTask<>(callable);
        Thread t = new Thread(futureTask);
        t.start();
        //获取执行结果
        Integer sum = futureTask.get();
        System.out.println(sum);
    }
}

执行结果:

73d8c9be8b2a4960a39693770de0ac9a.png

2. ReentrantLock类(可重入锁)

ReentrantLock是除了synchronized外标准库给我们提供的另一种可重入锁, 与synchronized不同的是, ReentrantLock是通过lock方法加锁,unlock方法解锁, 相比于synchronized直接基于代码块的方式来加锁解锁更加传统.


正是由于加锁解锁两个操作是分开的, 所以代码的写法上需要格外注意, 一方面lock后如果之后的工作代码比较长久容易忘记去unlock从而造成死锁, 另一方面加锁后解锁前中间的代码万一出了问题(比如直接return或者出现异常), 都可能导致不能顺利执行unlock造成死锁.

ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();
// working
reentrantLock.unlock();

所以使用ReentrantLock类时, 一般要搭配finally使用, 将unlock放入到finally保证unlock一定会执行.

ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();
try {
    // working
} finally {
    reentrantLock.unlock();
}
reentrantLock.unlock();

🎯ReentrantLock相较于synchronized的优势:


ReentrantKLock提供了公平锁版本的实现, 而synchronized只实现了非公平锁, ReentrantKLock默认是非公平锁, 而公平锁版本只需要指定fair参数为true即可..

73d8c9be8b2a4960a39693770de0ac9a.png

对于synchronized来说, 提供的加锁操作就是"死等", 也就是说如果锁已经被占用, 只要锁没释放就需要一直等下去,而ReentrantLock提供了更灵活的等待方式: tryLeock.

方法 解释
boolean trylock() 无参数, 能加锁就加, 加不上就放弃
boolean trylock(long timeout, TimeUnit unit) 有参数, 超过指定时间, 加不上锁就放弃

ReentrantLock提供了一个更方便的等待通知机制, synchronized搭配的是wait, notify, 当我们notify的时候是随即唤醒一个wait状态的线程; 而ReentrantLock搭配一个Condition类, 进行唤醒的时候可以唤醒指定线程.

3. Semaphore类(信号量)

Java中信号量(Semaphore)是把操作系统原生的信号量封装了一下, 本质就是一个计数器, 描述了 “可用资源的个数”,主要涉及到两个操作

P操作: 申请一个可用资源, 计数器 -1.

V操作: 释放一个可用资源, 计数器 +1.


如果计数器为0了, 继续Р操作, 就会出现阻塞等待的情况.


🍂举个例子来理解信号量:


会开车的应该经常会碰到, 停车, 停车场门口有一个灯牌, 会显示停车位还剩余多少个, 每进去一辆车, 显示的停车位数量就-1, 就相当于进行了一次P操作, 每出去一辆车, 显示的停车位数量就+1, 就相当于进行了一次V操作; 而当停车场的剩余车位为0时, 显示的停车位数量就为0了, 此时如果还有车想停, 要么在这里等, 要么就去其他停车场.


🎯Semaphore类的常用方法:


构造方法

public Semaphore(int permits) 构造可用资源为permits个的信号量对象
public Semaphore(int permits, boolean fair) 构造可用资源为permits个的信号量对象, 并指定信号量是否是公平性质的

PV方法

Semaphore的PV操作中的加减计数器操作都是原子的, 可以在多线程环境下直接使用.

public void acquire() throws InterruptedException 申请可用资源
public void release() 释放可用资源

代码示例:


创建Semaphore示例, 初始化为4, 表示有4个可用资源.

acquire方法表示申请资源(P操作), release方法表示释放资源(V操作).

创建20个线程, 每个线程都尝试申请资源, sleep1秒之后, 释放资源. 观察程序的执行效果.

import java.util.concurrent.Semaphore;
public class Test {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(4);
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("申请资源");
                    semaphore.acquire();
                    System.out.println("我获取到资源了");
                    Thread.sleep(1000);
                    System.out.println("我释放资源了");
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        for (int i = 0; i < 20; i++) {
            Thread t = new Thread(runnable);
            t.start();
        }
    }
}

执行结果:

73d8c9be8b2a4960a39693770de0ac9a.png

考虑这里一个场景, 假设有一个计数器初始值为1的信号量, 针对这个信号量的值, 就只有1和0两种取值, 因为信号量不能是负的,


执行一次Р操作, 1->0

执行一次V操作, 0->1

如果已经进行一次Р操作了, 继续进行Р操作, 就会阻塞等待, 这是不是和锁的功效有点类似呢?


锁就可以视为 “二元信号量”, 可用资源就1个, 计数器的取值非0即1, 可以说, 锁是信号量的一种特殊情况, 信号量就把锁推广到了一般情况, 描述了可用资源更多的时候是如何处理的.


所以说, 计数器初始值为1的信号量就可以当成锁来使用, 这里我们编写一个代码实现两个线程增加同一个变量, 使用Semphore来控制线程安全.

import java.util.concurrent.Semaphore;
public class TestDemo27 {
    // 编写代码实现两个线程增加同一个变量, 使用 Semphore 来控制线程安全.
    public static int count = 0;
    public static void main(String[] args) throws InterruptedException {
        Semaphore semaphore = new Semaphore(1);
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 50000; i++) {
                try {
                    semaphore.acquire();
                    count++;
                    semaphore.release();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 50000; i++) {
                try {
                    semaphore.acquire();
                    count++;
                    semaphore.release();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(count);
    }
}

执行结果:

73d8c9be8b2a4960a39693770de0ac9a.png

4. CountDownLatch同步工具类

CountDownLatch是一个同步工具类, 用来协调多个线程之间的同步, 或者说起到线程之间的通信(而不是用作互斥的作用).


CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后, 再继续执行; 使用一个计数器进行实现, 计数器初始值为线程的数量; 当每一个线程完成自己任务后, 计数器的值就会-1, 当计数器的值为0时, 表示所有的线程都已经完成一些任务, 然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务.


想象一个场景进行理解, 假设有一场跑步比赛, 比赛开始时间是明确的, 裁判的发令枪响了比赛就开始了, 但结束时间是不确定的, 只有当所有的选手都冲过了终点线才算结束, 这里的运动员就相当于线程, 而CountDownLatch类用判断什么时候最后一个远动员冲过终点线, 即这些线程在什么时候可以全部执行结束.


🎯CountDownLatch类常用方法:


构造方法

public CountDownLatch(int count) 构造实例对象, count表示CountDownLatch对象中计数器的值

普通方法

public void await() throws InterruptedException 使所处的线程进入阻塞等待, 直到计数器的值清零
public void countDown() 将计数器的值减1
public long getCount() 获取计数器最初的值

代码示例:


10个选手依次就位, 哨声响才同时出发, 所有选手都通过终点, 比赛结束.

构造CountDownLatch实例, 初始化10表示有10个任务需要完成(10个选手参加比赛).

每个任务执行完毕, 都调用latch.countDown(), 在CountDownLatch内部的计数器同时自减(有一个选手冲过了终点线).

主线程中使用 latch.await(), 阻塞等待所有任务执行完毕, 相当于计数器为0了(所有选手都冲过了终点线比赛结束).

import java.util.concurrent.CountDownLatch;
public class Test2 {
    public static void main(String[] args) throws InterruptedException {
        //构造方法的参数表示有几个选手
        CountDownLatch latch = new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            Thread t = new Thread(() -> {
                try {
                    Thread.sleep(3000);
                    System.out.println(Thread.currentThread().getName() + " 到达终点");
                    latch.countDown(); //调用countDown的次数和个数一致
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            t.start();
        }
        //裁判要等所有线程到达
        //当这些线程没有执行完的时候,await就会阻塞,所有线程执行完了,await 才返回
        latch.await();
        System.out.println("比赛结束");
    }
}

执行结果:

73d8c9be8b2a4960a39693770de0ac9a.png

实际开发中这样的场景也是存在的, 比如多线程下载(迅雷, steam等下载器), 当我们下载一个比较大的文件资源(电影), 通过多线程下载就可以提高下载速度, 把一个大的文件拆成多个部分安排多个线程下载, 每个线程负责下载其中的一个部分, 等到是所有的线程都完成自己的下载, 才算把整个文件下载完, 这里就可以用到CountDownLatch来判断文件整体是否下载完毕, 多线程下载是充分利用了带宽(下载是IO操作, 和CPU关系不大).


目录
相关文章
|
5天前
|
存储 缓存 安全
【Java面试题汇总】多线程、JUC、锁篇(2023版)
线程和进程的区别、CAS的ABA问题、AQS、哪些地方使用了CAS、怎么保证线程安全、线程同步方式、synchronized的用法及原理、Lock、volatile、线程的六个状态、ThreadLocal、线程通信方式、创建方式、两种创建线程池的方法、线程池设置合适的线程数、线程安全的集合?ConcurrentHashMap、JUC
【Java面试题汇总】多线程、JUC、锁篇(2023版)
|
16天前
|
监控 Java 调度
【Java学习】多线程&JUC万字超详解
本文详细介绍了多线程的概念和三种实现方式,还有一些常见的成员方法,CPU的调动方式,多线程的生命周期,还有线程安全问题,锁和死锁的概念,以及等待唤醒机制,阻塞队列,多线程的六种状态,线程池等
79 6
【Java学习】多线程&JUC万字超详解
|
1天前
|
安全 Java API
JAVA并发编程JUC包之CAS原理
在JDK 1.5之后,Java API引入了`java.util.concurrent`包(简称JUC包),提供了多种并发工具类,如原子类`AtomicXX`、线程池`Executors`、信号量`Semaphore`、阻塞队列等。这些工具类简化了并发编程的复杂度。原子类`Atomic`尤其重要,它提供了线程安全的变量更新方法,支持整型、长整型、布尔型、数组及对象属性的原子修改。结合`volatile`关键字,可以实现多线程环境下共享变量的安全修改。
|
28天前
|
安全 Java 调度
|
28天前
|
安全 Java 程序员
线程安全与 Vector 类的分析
【8月更文挑战第22天】
20 4
|
28天前
|
设计模式 Java 调度
JUC线程池: ScheduledThreadPoolExecutor详解
`ScheduledThreadPoolExecutor`是Java标准库提供的一个强大的定时任务调度工具,它让并发编程中的任务调度变得简单而可靠。这个类的设计兼顾了灵活性与功能性,使其成为实现复杂定时任务逻辑的理想选择。不过,使用时仍需留意任务的执行时间以及系统的实际响应能力,以避免潜在的调度问题影响应用程序的行为。
49 1
|
21天前
|
Java API 调度
JUC线程池: FutureTask详解
总而言之,FutureTask是Java并发编程中一个非常实用的类,它在异步任务执行及结果处理方面提供了优雅的解决方案。在实现细节方面可以搭配线程池的使用,以及与Callable接口的配合使用,来完成高效的并发任务执行和结果处理。
25 0
|
24天前
|
安全 Java API
Java多线程编程:使用Atomic类实现原子操作
在Java多线程环境中,共享资源的并发访问可能导致数据不一致。传统的同步机制如`synchronized`关键字或显式锁虽能保障数据一致性,但在高并发场景下可能导致线程阻塞和性能下降。为此,Java提供了`java.util.concurrent.atomic`包下的原子类,利用底层硬件的原子操作确保变量更新的原子性,实现无锁线程安全。
13 0
|
Java 程序员 Spring
Java并发必知必会第三弹:用积木讲解ABA原理
Java并发必知必会第三弹:用积木讲解ABA原理
140 0
|
2天前
|
Java
深入理解Java中的多线程编程
本文将探讨Java多线程编程的核心概念和技术,包括线程的创建与管理、同步机制以及并发工具类的应用。我们将通过实例分析,帮助读者更好地理解和应用Java多线程编程,提高程序的性能和响应能力。
15 4