Java多线程之CAS中的ABA问题与JUC的常见类

简介: Java多线程之CAS中的ABA问题与JUC的常见类

一. CAS指令与ABA问题

1. 解析CAS

CAS即compare and awap, 字面意思是比较并交换, 具体说就是将寄存器或者某个内存上的值A与另一个内存上的值V进行比较, 如果相同就将B与需要修改的值V进行交换, 并返回交换是否成功的结果.

我们假设内存中的原数据V, 旧的预期值A, 需要修改的新值B, 具体涉及下面三个操作.

73d8c9be8b2a4960a39693770de0ac9a.png

比较A与V是否相等(比较).

如果比较相等, 将B写入V(交换), 不相等则不执行任何操作.

返回操作是否成功.

在上述交换过程中, 大多数情况下并不关心B后续的情况, 更关心的是V这个变量的情况, 这里的交换, 也可以近似理解成 “赋值”.


伪代码如下:

boolean CAS(A, B, V) {
  if (A == V) {
    V = B;
    return true;
  }
  return false;
}

CAS最特殊的地方在于, 上述过程, 并非是通过一段代码实现的, 而是通过一条CPU指令完成的, 该指令是具有原子性的, 是线程安全的.


2. 基于CAS实现的原子类

Java标准库中提供了基于CAS所实现的 “原子类”, 这些类的类名以Atomic开头,针对常用的 int, long 等类型进行了封装, 它们可以基于CAS的方式进行修改, 并且保证线程安全性.

73d8c9be8b2a4960a39693770de0ac9a.png

方法 解释
addAndGet(int delta) i += delta
decrementAndGet() –i
getAndDecrement() i–
incrementAndGet() ++i
getAndIncrement() i++

这里举个例子, 典型的就是 AtomicInteger 类, 要实现多线程自增同一个变量, 其中的 getAndIncrement 相当于 i++ 操作.

import java.util.concurrent.atomic.AtomicInteger;
public class TestDemo25 {
    // 编写代码, 基于 AtomicInteger 实现多线程自增同一个变量
    public static void main(String[] args) throws InterruptedException {
        AtomicInteger count = new AtomicInteger(0);
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 50000; i++) {
                // 这个方法就相当于 count++
                count.getAndIncrement();
            }
        });
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 50000; i++) {
                // 这个方法就相当于 count++
                count.getAndIncrement();
            }
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(count.get());
    }
}

执行结果:

73d8c9be8b2a4960a39693770de0ac9a.png

上面的代码就是基于CAS实现的++操作, 不会存在线程安全问题, 这里既能够保证线程安全, 又要比 synchronized高效, synchronized会涉及到锁的竞争, 两个线程要相互等待, CAS不涉及到线程阻塞等待.

上面所使用的AtomicInteger类方法getAndIncrement实现的伪代码如下:

class AtomicInteger {
    private int value;//保存的值
    //自增操作
    public int getAndIncrement() {
        int oldValue = value;
        while ( CAS(value, oldValue, oldValue+1) != true) {
            oldValue = value;
        }
        return oldValue;
    }
}

对于CAS指令, 它的执行逻辑就是先判断value的值与oldValue的值是否相同, 如果相同就把oldValue+1的值进写入到value中(相当于将value的值加1), oldValue可以理解成寄存器里的值, 相当于是先把value变量内存中的值读至到寄存器当中, 在单线程环境中oldValue的值与value的值一定是相同, 但多线程环境下就不一定了, 因为value的值随时都有可能被其他线程修改, 比如执行完读取value到寄存器, 线程就发生切换了, 另外一个线程, 也进行修改了value的值, 然后当这个线程再被调度回来后, 再进行CAS判定 ,就认为value和oldValue不相等了.


接着往下看while循环, 该循环使用CAS指令是否成功为判断条件, 如果CAS成功了则退出循环, 此时value的值已经加1, 最终返回oldValue; 如果CAS指令失败了; 这就说明有新线程提前对当前的value进行了++, value的值发生了改变, 这这时候就需要将新的value的值赋值给oldValue, 然后尝试重新进行CAS操作, 这样就能保证有几个线程操作, 那就自增几次, 从而也就保证了线程安全.


结合下图, 当两个线程现指令交错的情况, 理解基于CAS指令实现的多线程自增操作是如何保证线程安全的.

73d8c9be8b2a4960a39693770de0ac9a.png

3. 基于CAS实现自旋锁

CAS的应用场景处了实现原子类, 还能够实现自旋锁, 伪代码如下:

//自旋锁对象
public class SpinLock {
    //记录当前锁对象被哪个线程占用,为null,表示锁对象未被占用
    private Thread ownerv = null;
    public void lock(){
        // 通过 CAS 看当前锁是否被某个线程持有. 
        // 如果这个锁已经被别的线程持有,那么就自旋等待. 
        // 如果这个锁没有被别的线程持有,那么就把 owner 设为当前尝试加锁的线程. 
        while(!CAS(this.owner, null, Thread.currentThread())){
        }
   }
    public void unlock (){
        this.owner = null;
   }
}

上面CAS与自旋锁的逻辑为了监测当前锁对象是否被线程占用, CAS监测当前的owner是否是null, 如果是null, 就进行交换, 也就是把当前线程的引用赋值给owner, 此时循环结束, 退出lock方法, 加锁就完成了.


如果当前锁, 已经被别的线程占用了, CAS就会发现, this.owner不是null, CAS就不会产生赋值, 也同时返回false, 循环继续执行, 并进行下次判定.


解锁的逻辑简单了, 将占用锁对象的线程(ownerv)置为null即可.

4. ABA问题

CAS指令操作的核心的检查value和oldValue是否相同, 如果相同, 就视为value中途没有被修改过, 所以进行下一步交换操作是没问题的, 在大部分情况下都能保证线程安全.


但这里有一种非常极端的情况, 这里说到的相同, value的值可能是没改过的, 还有可能是value的值被修改后又被改回到原来的值, 比如把value的值设为A的话, CAS判定value为A, 此时可能确实value始终是A, 也可能是value本来是A, 然后被改成了B, 最后又还原成了A.


上数说到的极端情况就是CAS中的ABA问题, 在一些极端场景下就会有bug存在, 比如下面的场景.


有一天, 滑稽老铁要到ATM机去取款, 假设当前滑稽的账户余额1000, 滑稽准备取500, 当滑稽老铁按下取款的这一瞬间, 机器卡了一下, 滑稽下意识就多按了一下, 如果考虑使用CAS的方式来扣款, 系统扣款的情况可能是下图所示:

73d8c9be8b2a4960a39693770de0ac9a.png

正常情况下, 即使按下两次取款按钮最终的结果也是正常的, 但考虑一种极端情况, 如果在第一次CAS成功后的一瞬间, 滑稽老铁的朋友又给给滑稽转账了500, 导致第一次CAS扣款500后的余额从500又变回到了1000, 然后紧接着第二次CAS操作也会执行成功, 又成功扣款500, 最终余额变成了500, 这种结果显然是不合理的, 而正确的程序应该是第二次CAS仍然失败, 最终余额为1000元.

73d8c9be8b2a4960a39693770de0ac9a.png

上述描述场景是极端的情况, 发生的概率是非常非常低的, 一方面, 恰好滑稽这边多按了几次产生多个扣款操作, 另一方面, 又赶巧在这个非常极限的时间内, 有人转账了一样的金额…


不过上述ABA问题在极端可能下造成的bug也是有办法解决的, 可以针对当前问题引入一个版本号, 假设初始版本号是1, 版本号只能增加不能减少, 每次修改版本号都+1, 然后进行CAS的时候, 就不是以金额值为基准了, 而是以版本号为基准, 在进行CAS操作之前, 都要对版本号进行验证, 如果版本号与之前加载的版本号不同, 则放弃此次CAS指令操作, 看下图理解, 这样最终的结果就是正确的了.73d8c9be8b2a4960a39693770de0ac9a.png


二. JUC中的常见类

Java中的JUC就是来自java.util.concurrent包下的一些标准类或者接口, 放的都是并发编程(多线程)相关的组件.

1. Callable接口

常见的创建线程的方式有两种方式, 第一种方法是直接继承Thread类, 重写run方法, 第二种方法是实现Runnable接口, 然后还是要靠Thread类的构造器, 把Runnable传进去, 最终调用的就是Runnable的run方法。; 和Runnable类似, 我们还可以通过Callable接口描述一个任务配合FutureTask类来创建线程, 和Runnable不同的是, Callable接口配合FutureTask类所创建的线程其中的任务是可以带有返回值的, 而一开始提到的那两种方式任务是不支持带返回值的.


Callable接口中有一个call方法(返回值是泛型参数), 就相当于Runnable接口中的run方法(无返回值), FutureTask可用于异步获取执行结果或取消执行任务的场景, 通过传入Runnable或者Callable的任务给FutureTask, 直接调用其run方法或者放入线程池执行, 之后可以在外部通过FutureTask的get方法异步获取执行结果, 如果任务还没有执行完毕, get方法会阻塞直到任务返回结果.


理解FutureTask可以为想象去吃麻辣烫, 当餐点好后, 后厨就开始做了, 同时前台会给你一张 “小票”, 这个小票就是FutureTask, 后面我们可以随时凭这张小票去查看自己的这份麻辣烫做出来了没.


使用Thread类的构造器创建线程的时候, 传入的引用不能是Callable类型的, 而应该是FutrueTask类型, 因为构造器中传入的任务类型需要是一个Runnable类,Callable与Runnable是没有直接关系的, 但FutrueTask类实现了Runnable类, 所以要想使用Callable创建线程, 我们就需要先把实现Callable接口的对象引用传给FutrueTask类的实例对象, 再将FutrueTask实例传入线程构造器中.

73d8c9be8b2a4960a39693770de0ac9a.png

总结一下就是, 我们可以用Callable用来描述任务, FutureTask类用来管理Callable任务的执行结果.

比如, 我们使用Callable来计算 1 + 2 + 3 + … + 1000 的值, 并通过返回值的方式获取执行结果.

代码示例:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class TestDemo26 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 使用 Callable 来计算 1 + 2 + 3 + ... + 1000
        Callable<Integer> callable = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                int sum = 0;
                for (int i = 0; i <= 1000; i++) {
                    sum += i;
                }
                return sum;
            }
        };
        FutureTask<Integer> futureTask = new FutureTask<>(callable);
        Thread t = new Thread(futureTask);
        t.start();
        //获取执行结果
        Integer sum = futureTask.get();
        System.out.println(sum);
    }
}

执行结果:

73d8c9be8b2a4960a39693770de0ac9a.png

2. ReentrantLock类(可重入锁)

ReentrantLock是除了synchronized外标准库给我们提供的另一种可重入锁, 与synchronized不同的是, ReentrantLock是通过lock方法加锁,unlock方法解锁, 相比于synchronized直接基于代码块的方式来加锁解锁更加传统.


正是由于加锁解锁两个操作是分开的, 所以代码的写法上需要格外注意, 一方面lock后如果之后的工作代码比较长久容易忘记去unlock从而造成死锁, 另一方面加锁后解锁前中间的代码万一出了问题(比如直接return或者出现异常), 都可能导致不能顺利执行unlock造成死锁.

ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();
// working
reentrantLock.unlock();

所以使用ReentrantLock类时, 一般要搭配finally使用, 将unlock放入到finally保证unlock一定会执行.

ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();
try {
    // working
} finally {
    reentrantLock.unlock();
}
reentrantLock.unlock();

🎯ReentrantLock相较于synchronized的优势:


ReentrantKLock提供了公平锁版本的实现, 而synchronized只实现了非公平锁, ReentrantKLock默认是非公平锁, 而公平锁版本只需要指定fair参数为true即可..

73d8c9be8b2a4960a39693770de0ac9a.png

对于synchronized来说, 提供的加锁操作就是"死等", 也就是说如果锁已经被占用, 只要锁没释放就需要一直等下去,而ReentrantLock提供了更灵活的等待方式: tryLeock.

方法 解释
boolean trylock() 无参数, 能加锁就加, 加不上就放弃
boolean trylock(long timeout, TimeUnit unit) 有参数, 超过指定时间, 加不上锁就放弃

ReentrantLock提供了一个更方便的等待通知机制, synchronized搭配的是wait, notify, 当我们notify的时候是随即唤醒一个wait状态的线程; 而ReentrantLock搭配一个Condition类, 进行唤醒的时候可以唤醒指定线程.

3. Semaphore类(信号量)

Java中信号量(Semaphore)是把操作系统原生的信号量封装了一下, 本质就是一个计数器, 描述了 “可用资源的个数”,主要涉及到两个操作

P操作: 申请一个可用资源, 计数器 -1.

V操作: 释放一个可用资源, 计数器 +1.


如果计数器为0了, 继续Р操作, 就会出现阻塞等待的情况.


🍂举个例子来理解信号量:


会开车的应该经常会碰到, 停车, 停车场门口有一个灯牌, 会显示停车位还剩余多少个, 每进去一辆车, 显示的停车位数量就-1, 就相当于进行了一次P操作, 每出去一辆车, 显示的停车位数量就+1, 就相当于进行了一次V操作; 而当停车场的剩余车位为0时, 显示的停车位数量就为0了, 此时如果还有车想停, 要么在这里等, 要么就去其他停车场.


🎯Semaphore类的常用方法:


构造方法

public Semaphore(int permits) 构造可用资源为permits个的信号量对象
public Semaphore(int permits, boolean fair) 构造可用资源为permits个的信号量对象, 并指定信号量是否是公平性质的

PV方法

Semaphore的PV操作中的加减计数器操作都是原子的, 可以在多线程环境下直接使用.

public void acquire() throws InterruptedException 申请可用资源
public void release() 释放可用资源

代码示例:


创建Semaphore示例, 初始化为4, 表示有4个可用资源.

acquire方法表示申请资源(P操作), release方法表示释放资源(V操作).

创建20个线程, 每个线程都尝试申请资源, sleep1秒之后, 释放资源. 观察程序的执行效果.

import java.util.concurrent.Semaphore;
public class Test {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(4);
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("申请资源");
                    semaphore.acquire();
                    System.out.println("我获取到资源了");
                    Thread.sleep(1000);
                    System.out.println("我释放资源了");
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        for (int i = 0; i < 20; i++) {
            Thread t = new Thread(runnable);
            t.start();
        }
    }
}

执行结果:

73d8c9be8b2a4960a39693770de0ac9a.png

考虑这里一个场景, 假设有一个计数器初始值为1的信号量, 针对这个信号量的值, 就只有1和0两种取值, 因为信号量不能是负的,


执行一次Р操作, 1->0

执行一次V操作, 0->1

如果已经进行一次Р操作了, 继续进行Р操作, 就会阻塞等待, 这是不是和锁的功效有点类似呢?


锁就可以视为 “二元信号量”, 可用资源就1个, 计数器的取值非0即1, 可以说, 锁是信号量的一种特殊情况, 信号量就把锁推广到了一般情况, 描述了可用资源更多的时候是如何处理的.


所以说, 计数器初始值为1的信号量就可以当成锁来使用, 这里我们编写一个代码实现两个线程增加同一个变量, 使用Semphore来控制线程安全.

import java.util.concurrent.Semaphore;
public class TestDemo27 {
    // 编写代码实现两个线程增加同一个变量, 使用 Semphore 来控制线程安全.
    public static int count = 0;
    public static void main(String[] args) throws InterruptedException {
        Semaphore semaphore = new Semaphore(1);
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 50000; i++) {
                try {
                    semaphore.acquire();
                    count++;
                    semaphore.release();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 50000; i++) {
                try {
                    semaphore.acquire();
                    count++;
                    semaphore.release();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(count);
    }
}

执行结果:

73d8c9be8b2a4960a39693770de0ac9a.png

4. CountDownLatch同步工具类

CountDownLatch是一个同步工具类, 用来协调多个线程之间的同步, 或者说起到线程之间的通信(而不是用作互斥的作用).


CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后, 再继续执行; 使用一个计数器进行实现, 计数器初始值为线程的数量; 当每一个线程完成自己任务后, 计数器的值就会-1, 当计数器的值为0时, 表示所有的线程都已经完成一些任务, 然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务.


想象一个场景进行理解, 假设有一场跑步比赛, 比赛开始时间是明确的, 裁判的发令枪响了比赛就开始了, 但结束时间是不确定的, 只有当所有的选手都冲过了终点线才算结束, 这里的运动员就相当于线程, 而CountDownLatch类用判断什么时候最后一个远动员冲过终点线, 即这些线程在什么时候可以全部执行结束.


🎯CountDownLatch类常用方法:


构造方法

public CountDownLatch(int count) 构造实例对象, count表示CountDownLatch对象中计数器的值

普通方法

public void await() throws InterruptedException 使所处的线程进入阻塞等待, 直到计数器的值清零
public void countDown() 将计数器的值减1
public long getCount() 获取计数器最初的值

代码示例:


10个选手依次就位, 哨声响才同时出发, 所有选手都通过终点, 比赛结束.

构造CountDownLatch实例, 初始化10表示有10个任务需要完成(10个选手参加比赛).

每个任务执行完毕, 都调用latch.countDown(), 在CountDownLatch内部的计数器同时自减(有一个选手冲过了终点线).

主线程中使用 latch.await(), 阻塞等待所有任务执行完毕, 相当于计数器为0了(所有选手都冲过了终点线比赛结束).

import java.util.concurrent.CountDownLatch;
public class Test2 {
    public static void main(String[] args) throws InterruptedException {
        //构造方法的参数表示有几个选手
        CountDownLatch latch = new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            Thread t = new Thread(() -> {
                try {
                    Thread.sleep(3000);
                    System.out.println(Thread.currentThread().getName() + " 到达终点");
                    latch.countDown(); //调用countDown的次数和个数一致
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            t.start();
        }
        //裁判要等所有线程到达
        //当这些线程没有执行完的时候,await就会阻塞,所有线程执行完了,await 才返回
        latch.await();
        System.out.println("比赛结束");
    }
}

执行结果:

73d8c9be8b2a4960a39693770de0ac9a.png

实际开发中这样的场景也是存在的, 比如多线程下载(迅雷, steam等下载器), 当我们下载一个比较大的文件资源(电影), 通过多线程下载就可以提高下载速度, 把一个大的文件拆成多个部分安排多个线程下载, 每个线程负责下载其中的一个部分, 等到是所有的线程都完成自己的下载, 才算把整个文件下载完, 这里就可以用到CountDownLatch来判断文件整体是否下载完毕, 多线程下载是充分利用了带宽(下载是IO操作, 和CPU关系不大).


目录
相关文章
|
1天前
|
Java 编译器
Java Character 类
4月更文挑战第13天
|
1天前
|
存储 安全 Java
Java中的容器,线程安全和线程不安全
Java中的容器,线程安全和线程不安全
7 1
|
2天前
|
存储 Java
Java基础教程(7)-Java中的面向对象和类
【4月更文挑战第7天】Java是面向对象编程(OOP)语言,强调将事务抽象成对象。面向对象与面向过程的区别在于,前者通过对象间的交互解决问题,后者按步骤顺序执行。类是对象的模板,对象是类的实例。创建类使用`class`关键字,对象通过`new`运算符动态分配内存。方法包括构造函数和一般方法,构造函数用于对象初始化,一般方法处理逻辑。方法可以有0个或多个参数,可变参数用`类型...`定义。`this`关键字用于访问当前对象的属性。
|
3天前
|
设计模式 运维 安全
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第15天】在Java开发中,多线程编程是提升应用程序性能和响应能力的关键手段。然而,它伴随着诸多挑战,尤其是在保证线程安全的同时如何避免性能瓶颈。本文将探讨Java并发编程的核心概念,包括同步机制、锁优化、线程池使用以及并发集合等,旨在为开发者提供实用的线程安全策略和性能优化技巧。通过实例分析和最佳实践的分享,我们的目标是帮助读者构建既高效又可靠的多线程应用。
|
4天前
|
Java 程序员 编译器
Java中的线程同步与锁优化策略
【4月更文挑战第14天】在多线程编程中,线程同步是确保数据一致性和程序正确性的关键。Java提供了多种机制来实现线程同步,其中最常用的是synchronized关键字和Lock接口。本文将深入探讨Java中的线程同步问题,并分析如何通过锁优化策略提高程序性能。我们将首先介绍线程同步的基本概念,然后详细讨论synchronized和Lock的使用及优缺点,最后探讨一些锁优化技巧,如锁粗化、锁消除和读写锁等。
|
6天前
|
Java
探秘jstack:解决Java应用线程问题的利器
探秘jstack:解决Java应用线程问题的利器
14 1
探秘jstack:解决Java应用线程问题的利器
|
6天前
|
Java 调度 开发者
Java 21时代的标志:虚拟线程带来的并发编程新境界
Java 21时代的标志:虚拟线程带来的并发编程新境界
14 0
|
6天前
|
Java Shell
Java 21颠覆传统:未命名类与实例Main方法的编码变革
Java 21颠覆传统:未命名类与实例Main方法的编码变革
10 0
|
6天前
|
Java
Java 15 神秘登场:隐藏类解析未知领域
Java 15 神秘登场:隐藏类解析未知领域
10 0
|
9天前
|
存储 Java 数据库连接
java多线程之线程通信
java多线程之线程通信