Java——多线程高并发系列之理解CAS、原子变量类的使用

简介: Java——多线程高并发系列之理解CAS、原子变量类的使用

文章目录:


1.CAS

2.原子变量类

2.1AtomicInteger

2.2 AtomicLong

2.3 AtomicIntegerArray

2.4 AtomicIntegerFieldUpdater

2.5 AtomicReference

2.6 AtomicStampedReference

1.CAS


CAS(Compare And Swap)是由硬件实现的。CAS 可以将 read- modify - write 这类的操作转换为原子操作。

i++自增操作包括三个子操作:  从主内存读取 i 变量值;对 i 的值加 1;再把加 1 之后的值保存到主内存。

CAS原理: 在把数据更新到主内存时,再次读取主内存变量的值,如果现在变量的值与期望的值(操作起始时读取的值)一样就更新。

下面我说一下,我对上面这张图中CAS算法的理解:首先在主内存有一个所有线程都共享的count变量,它的值初始化为10

1.    线程1从主内存中读取count变量的值到自己的工作内存中,此时count10。(这个值我们记为:线程1期望的count值、最初读取到的count值)

2.    线程1对自己工作内存中的count的值进行自增操作,此时count11。(这个值我们记为:线程1更新之后的count值)

3.    这个时候,当线程1还未执行到第三步更新count的值到主内存中的时候,线程2突然抢到了CPU执行权,它进来从主内存中读取count变量的值到自己的工作内存中,因为线程1还未更新,所以此时count仍为10。(这个值我们记为:线程2期望的count值、最初读取到的count值)

4.    线程2对自己工作内存中的count的值进行自增操作,此时count11。(这个值我们记为:线程2更新之后的count值)

5.    在线程2要将自己工作内存中的count值更新到主内存之前,要执行一部重要的操作!!!就是再次读取主内存中共享变量count的值,如果此时读取到的这个值与最开始线程2期望的count值相等,那么就顺利更新count的值;如果不相等,就撤销放弃本次count++操作。

6.    那么线程2最开始期望的count值为10,而此时从主内存中读取到的count值还是10,一样,所以就更新,将自己工作内存中的count值更新到主内存中,此时count11

7.    那么线程2执行完了,此时线程1重新获得CPU执行权,它已经完成了count++的前两步了,就剩最后一步更新操作了,那么它此时同线程2一样,再次从主内存中读取共享变量count的值,一样就更新、不一样就撤销放弃操作。然而count的值已经被线程2更新过了,此时为11,而线程1最开始期望的count的值为10,两次读取到count的值不一样了!!!所以线程1就会自动撤销本次count++操作。

8.    最终,本次线程12count变量执行++操作的结果:线程1失败、线程2成功,最后count的值只增加了1次,为11


下面,我再来说一下什么是CAS中的ABA问题。

·       CAS实现原子操作背后有一个假设:共享变量的当前值与当前线程提供的期望值相同,就认为这个变量没有被其他线程修改过。

·       实际上这种假设不一定总是成立的。假如说有共享变量 count=100

·       A线程将 count 修改为 200

·       B线程将 count 修改为 300

·       C线程将 count 修改为 100

·       当前线程看到的count变量的值是100,这就与最初的count变量的值是一样的,那么是否就认为count变量的值没有被其他线程更新呢?显然不是啊,它明显的被AB两个线程更新过。

·       这就是CAS中的ABA问题,即共享变量经历了 A → B → A 的更新。

·       如果想要规避ABA问题,可以为共享变量引入一个修订好(或者叫时间戳),每次修改共享变量时,相应的修订号就会增加1ABA问题的过程就转变为:[A0] → [B1] → [A2] ,每次修改共享变量都会导致修订号的增加,通过修订号就可以准确的判断共享便是否被其他线程修改过。在原子类中,AtomicReference就会面临ABA问题的困扰,而AtomicStampedReference 可以很好的规避ABA问题。详细内容看下面的代码案例。


2.原子变量类


原子变量类基于CAS实现的,当对共享变量进行read-modify-write更新操作时,通过原子变量类可以保障操作的原子性与可见性。对变量的 read-modify-write 更新操作是指当前操作不是一个简单的赋值,而是变量的新值依赖变量的旧值,如自增操作i++。由于volatile只能保证可见性、无法保障原子性,原子变量类内部就是借助一个Volatile变量,并且保障了该变量的 read-modify-write 操作的原子性,有时把原子变量类看作增强的 volatile 变量。

原子变量类有 12 个,如:(这12个原子变量类全部位于JUC这个Java并发包的atomic子包下的)

分组

原子变量类

基础数据型

AtomicInteger AtomicLongAtomicBoolean

数组型

AtomicIntegerArrayAtomicLongArrayAtomicReferenceArray

字段更新器

AtomicIntegerFieldUpdaterAtomicLongFieldUpdaterAtomicReferenceFieldUpdater

引用型

AtomicReferenceAtomicStampedReferenceAtomicMarkableReference

2.1AtomicInteger

package com.szh.volatiletest;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * 使用原子类进行自增操作
 */
public class Test03 {
    public static void main(String[] args) throws InterruptedException {
        //在main线程中创建10个子线程
        for (int i = 0; i < 10; i++) {
            new MyThread().start();
        }
        Thread.sleep(1000);
        System.out.println(MyThread.count.get());
    }
    static class MyThread extends Thread {
        private static AtomicInteger count=new AtomicInteger();
        public static void addCount() {
            for (int i = 0; i < 1000; i++) {
                count.getAndIncrement();
            }
            System.out.println(Thread.currentThread().getName() + " count = " + count.get());
        }
        @Override
        public void run() {
            addCount();
        }
    }
}

首先是AtomicInteger,它其中的 getAndIncrement 方法就是 count++ 操作,get() 方法是获取最终的执行结果,可以看到10个子线程各自执行1000 count++ 操作,最终的结果正确无误:10000

2.2 AtomicLong

这里我是用 AtomicLong 定义一个简单的计数器,来模拟实现原子操作

main主线程中一共创建了1000个子线程,每个线程都看作一个请求,所以请求总数肯定是1000,然后我模拟偶数就算请求成功、奇数就算请求失败。那么这个时候,因为Java中的多线程是采用抢占式来调度的,所以这里成功和失败的比值一般来说不会是11(各500次),输出结果中,我们也看到了是494506,当然这个执行结果也不是唯一的。

package com.szh.atomiclong;
import java.util.concurrent.atomic.AtomicLong;
/**
 * 使用原子变量类定义一个计数器
 * 该计数器,在整个程序中都能使用,所以这个计数器可以设计为单例
 */
public class Indicator {
    //构造方法私有化
    private Indicator() {}
    //定义一个私有的本类静态对象
    private static final Indicator INSTANCE=new Indicator();
    //提供一个公共静态方法,获取该类唯一的实例
    public static Indicator getInstance() {
        return INSTANCE;
    }
    //使用原子变量类
    private final AtomicLong requestCount=new AtomicLong(0); //记录请求总数
    private final AtomicLong successCount=new AtomicLong(0); //记录成功总数
    private final AtomicLong failureCount=new AtomicLong(0); //记录失败总数
    //有新的请求
    public void newRequestReceive() {
        requestCount.getAndIncrement();
    }
    //处理成功
    public void requestReceiveSuccess() {
        successCount.getAndIncrement();
    }
    //处理失败
    public void requestReceiveFialure() {
        failureCount.getAndIncrement();
    }
    //查看请求总数
    public long getRequestCount() {
        return requestCount.get();
    }
    //查看成功总数
    public long getSuccessCount() {
        return successCount.get();
    }
    //查看失败总数
    public long getFailureCount() {
        return failureCount.get();
    }
}
package com.szh.atomiclong;
import java.util.Random;
/**
 *
 */
public class Test {
    public static void main(String[] args) {
        for (int i = 0; i < 1000; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    //每个线程就是一个请求,请求总数加1
                    Indicator.getInstance().newRequestReceive();
                    int num=new Random().nextInt();
                    if (num % 2 == 0) { //模拟偶数为请求成功
                        Indicator.getInstance().requestReceiveSuccess();
                    }else { //模拟奇数为请求失败
                        Indicator.getInstance().requestReceiveFialure();
                    }
                }
            }).start();
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //打印请求数
        System.out.println(Indicator.getInstance().getRequestCount()); //请求总数
        System.out.println(Indicator.getInstance().getSuccessCount()); //成功总数
        System.out.println(Indicator.getInstance().getFailureCount()); //失败总数
    }
}


2.3 AtomicIntegerArray


案例1AtomicIntegerArray中的常用方法。

package com.szh.atomicarray;
import java.util.concurrent.atomic.AtomicIntegerArray;
/**
 * AtomicIntegerArray 原子数组类的基本操作
 */
public class Test01 {
    public static void main(String[] args) {
        //创建一个指定长度的原子数组
        AtomicIntegerArray array=new AtomicIntegerArray(10);
        System.out.println(array);
        //返回指定位置的元素
        System.out.println(array.get(0));
        System.out.println(array.get(8));
        //设置指定位置的元素
        array.set(0,20);
        //在设置某个位置元素的新值时,返回该元素的旧值
        System.out.println(array.getAndSet(1,66));
        System.out.println(array);
        //将某个位置元素的值,加上一个数,返回该元素的新值
        System.out.println(array.addAndGet(0,30));
        System.out.println(array.addAndGet(1,4));
        System.out.println(array);
        //CAS操作
        //如果数组中索引为0的元素的值为50,则将其替换为新值1,返回true
        //反之,如果该索引的元素的值不是50,则不进行替换,返回false
        System.out.println(array.compareAndSet(0,50,1));
        System.out.println(array);
        //自增/自减
        System.out.println(array.incrementAndGet(0)); // ++i
        System.out.println(array.getAndIncrement(1)); // i++
        System.out.println(array.decrementAndGet(2)); // --i
        System.out.println(array.getAndDecrement(3)); // i--
        System.out.println(array);
    }
}


案例2:定义一个原子数组,一个线程数组,10个线程分别将该原子数组中的每个元素自增1000次,所以最终得到的原子数组中,每个元素的值应该为:10*1000=10000

package com.szh.atomicarray;
import java.util.concurrent.atomic.AtomicIntegerArray;
/**
 * 在多线程中使用 AtomicIntegerArray 原子数组类
 */
public class Test02 {
    //定义一个原子数组
    static AtomicIntegerArray array=new AtomicIntegerArray(10);
    public static void main(String[] args) {
        //定义一个线程数组
        Thread[] threads=new Thread[10];
        for (int i = 0; i < threads.length; i++) {
            threads[i]=new AddThread();
        }
        //开启子线程
        for (Thread thread : threads) {
            thread.start();
        }
        //在主线程中查看自增之后的原子数组中每个元素的值,这里需要等待所有子线程执行完毕之后再查看
        //所以这里可以使用join方法进行线程合并,把所有的子线程合并到当前主线程中
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //打印原子数组
        System.out.println(array);
    }
    //定义一个线程类,在这个类中修改原子数组
    static class AddThread extends Thread {
        @Override
        public void run() {
            //将原子数组中的每个元素自增1000次
            for (int j=0;j<1000;j++) {
                for (int i = 0; i < array.length(); i++) {
                    array.getAndIncrement(i % array.length());
                }
            }
        }
    }
}


2.4 AtomicIntegerFieldUpdater

AtomicIntegerFieldUpdater可以对原子整数字段进行更新,要求:

1.    字符必须使用 volatile 修饰,使线程之间可见。

2.    只能是实例变量,不能是静态变量,也不能使用 final 修饰。

在这个Java Bean中,我将age属性设置为volatile修饰,也就是说此时这个age对所有的线程都是可见的。

package com.szh.atominintegerfiledupdater;
/**
 *
 */
public class User {
    int id;
    volatile int age;
    public User(int id, int age) {
        this.id = id;
        this.age = age;
    }
    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", age=" + age +
                '}';
    }
}
package com.szh.atominintegerfiledupdater;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/**
 *
 */
public class SubThread extends Thread {
    //创建要更新的User对象
    private User user;
    //创建 AtomicIntegerFieldUpdater 更新器
    private AtomicIntegerFieldUpdater<User> updater=AtomicIntegerFieldUpdater.newUpdater(User.class,"age");
    public SubThread(User user) {
        this.user=user;
    }
    @Override
    public void run() {
        //在子线程中对User对象的age字段进行自增操作
        for (int i=0;i<10;i++) {
            System.out.println(updater.getAndIncrement(user));
        }
    }
}

上面这段代码,我定义了一个字段更新器,它的构造函数中传入两个值,第一个是User.class表示要更新User这个类(传入该类的字节码文件),第二个参数代表你要更新的字段名(必须是volatile修饰的)。

下面这段代码,main主线程中开启了10个子线程,每个线程执行自己的run方法时,都对age字段进行10次的自增操作,那么最终age字段一共会被自增10*10=100次,而age的初始值我们传入了20,最终age=20+100=120

package com.szh.atominintegerfiledupdater;
/**
 *
 */
public class Test {
    public static void main(String[] args) {
        User user=new User(1001,20);
        //开启10个线程
        for (int i = 0; i < 10; i++) {
            new SubThread(user).start();
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(user);
    }
}

2.5 AtomicReference

案例1:这个原子类属于引用型的,可以原子的读写一个对象。

package com.szh.atomicreference;
import java.util.concurrent.atomic.AtomicReference;
/**
 * 使用 AtomicReference 原子读写一个对象
 */
public class Test01 {
    //创建一个AtomicReference对象
    static AtomicReference<String> reference=new AtomicReference<>("abc");
    public static void main(String[] args) throws InterruptedException {
        //开启10个线程,来修改字符串
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    if (reference.compareAndSet("abc","def")) {
                        System.out.println(Thread.currentThread().getName() + " 把字符串abc被修改为了def");
                    }
                }
            }).start();
        }
        //开启10个线程,来修改字符串
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    if (reference.compareAndSet("def","abc")) {
                        System.out.println(Thread.currentThread().getName() + " 把字符串def还原为了abc");
                    }
                }
            }).start();
        }
        Thread.sleep(1000);
        System.out.println(reference.get());
    }
}


案例2:演示AtomicReference中的ABA问题。

package com.szh.atomicreference;
import java.util.concurrent.atomic.AtomicReference;
/**
 *
 */
public class Test02 {
    private static AtomicReference<String> reference=new AtomicReference<>("abc");
    public static void main(String[] args) throws InterruptedException {
        //创建一个线程,先把abc修改为def,再把def还原为abc
        Thread t1=new Thread(new Runnable() {
            @Override
            public void run() {
                reference.compareAndSet("abc","def");
                System.out.println(Thread.currentThread().getName() + " ---> " + reference.get());
                reference.compareAndSet("def","abc");
            }
        });
        Thread t2=new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(reference.compareAndSet("abc","xyz"));
            }
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(reference.get());
    }
}

2.6 AtomicStampedReference


这个原子类可以很好的规避CAS中的ABA问题。

package com.szh.atomicreference;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
 * AtomicStampedReference 原子类可以解决 CAS 中的 ABA 问题
 * 在 AtomicStampedReference 原子类中有一个整数标记值 stamp,
 * 每次执行 CAS 操作时,需要对比它的版本,即比较 stamp 的值
 */
public class Test03 {
    //定义 AtomicStampedReference 引用操作"abc"字符串, 指定初始化版本号为 0
    private static AtomicStampedReference<String> reference=new AtomicStampedReference<>("abc",0);
    public static void main(String[] args) throws InterruptedException {
        Thread t1=new Thread(new Runnable() {
            @Override
            public void run() {
                reference.compareAndSet("abc","def",reference.getStamp(),reference.getStamp()+1);
                System.out.println(Thread.currentThread().getName() + " ---> " + reference.getReference());
                reference.compareAndSet("def","abc",reference.getStamp(),reference.getStamp()+1);
            }
        });
        Thread t2=new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(reference.compareAndSet("abc","xyz",reference.getStamp(), reference.getStamp()+1));
            }
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(reference.getReference());
    }
}

相关文章
|
1天前
|
Java
java开启线程的四种方法
这篇文章介绍了Java中开启线程的四种方法,包括继承Thread类、实现Runnable接口、实现Callable接口和创建线程池,每种方法都提供了代码实现和测试结果。
java开启线程的四种方法
|
4天前
|
存储 缓存 安全
深度剖析Java HashMap:源码分析、线程安全与最佳实践
深度剖析Java HashMap:源码分析、线程安全与最佳实践
|
5天前
|
缓存 前端开发 JavaScript
一篇文章助你搞懂java中的线程概念!纯干货,快收藏!
【8月更文挑战第11天】一篇文章助你搞懂java中的线程概念!纯干货,快收藏!
13 0
一篇文章助你搞懂java中的线程概念!纯干货,快收藏!
|
4天前
|
算法 安全 Java
深入解析Java多线程:源码级别的分析与实践
深入解析Java多线程:源码级别的分析与实践
|
5天前
|
Java 程序员 调度
深入浅出Java多线程编程
Java作为一门成熟的编程语言,在多线程编程方面提供了丰富的支持。本文将通过浅显易懂的语言和实例,带领读者了解Java多线程的基本概念、创建方法以及常见同步工具的使用,旨在帮助初学者快速入门并掌握Java多线程编程的基础知识。
4 0
|
5天前
|
Java
java中获取当前执行线程的名称
这篇文章介绍了两种在Java中获取当前执行线程名称的方法:使用`Thread`类的`getName`方法直接获取本线程的名称,以及使用`Thread.currentThread()`方法获取当前执行对象的引用再调用`getName`方法。
|
5天前
|
Java 测试技术
Java SpringBoot Test 单元测试中包括多线程时,没跑完就结束了
Java SpringBoot Test 单元测试中包括多线程时,没跑完就结束了
10 0
|
3月前
|
消息中间件 Java Linux
2024年最全BATJ真题突击:Java基础+JVM+分布式高并发+网络编程+Linux(1),2024年最新意外的惊喜
2024年最全BATJ真题突击:Java基础+JVM+分布式高并发+网络编程+Linux(1),2024年最新意外的惊喜
|
2月前
|
缓存 NoSQL Java
Java高并发实战:利用线程池和Redis实现高效数据入库
Java高并发实战:利用线程池和Redis实现高效数据入库
192 0
|
2月前
|
存储 NoSQL Java
探索Java分布式锁:在高并发环境下的同步访问实现与优化
【6月更文挑战第30天】Java分布式锁在高并发下确保数据一致性,通过Redis的SETNX、ZooKeeper的临时节点、数据库操作等方式实现。优化策略包括锁超时重试、续期、公平性及性能提升,关键在于平衡同步与效率,适应大规模分布式系统的需求。
64 1