Java——多线程高并发系列之理解CAS、原子变量类的使用

简介: Java——多线程高并发系列之理解CAS、原子变量类的使用

文章目录:


1.CAS

2.原子变量类

2.1AtomicInteger

2.2 AtomicLong

2.3 AtomicIntegerArray

2.4 AtomicIntegerFieldUpdater

2.5 AtomicReference

2.6 AtomicStampedReference

1.CAS


CAS(Compare And Swap)是由硬件实现的。CAS 可以将 read- modify - write 这类的操作转换为原子操作。

i++自增操作包括三个子操作:  从主内存读取 i 变量值;对 i 的值加 1;再把加 1 之后的值保存到主内存。

CAS原理: 在把数据更新到主内存时,再次读取主内存变量的值,如果现在变量的值与期望的值(操作起始时读取的值)一样就更新。

下面我说一下,我对上面这张图中CAS算法的理解:首先在主内存有一个所有线程都共享的count变量,它的值初始化为10

1.    线程1从主内存中读取count变量的值到自己的工作内存中,此时count10。(这个值我们记为:线程1期望的count值、最初读取到的count值)

2.    线程1对自己工作内存中的count的值进行自增操作,此时count11。(这个值我们记为:线程1更新之后的count值)

3.    这个时候,当线程1还未执行到第三步更新count的值到主内存中的时候,线程2突然抢到了CPU执行权,它进来从主内存中读取count变量的值到自己的工作内存中,因为线程1还未更新,所以此时count仍为10。(这个值我们记为:线程2期望的count值、最初读取到的count值)

4.    线程2对自己工作内存中的count的值进行自增操作,此时count11。(这个值我们记为:线程2更新之后的count值)

5.    在线程2要将自己工作内存中的count值更新到主内存之前,要执行一部重要的操作!!!就是再次读取主内存中共享变量count的值,如果此时读取到的这个值与最开始线程2期望的count值相等,那么就顺利更新count的值;如果不相等,就撤销放弃本次count++操作。

6.    那么线程2最开始期望的count值为10,而此时从主内存中读取到的count值还是10,一样,所以就更新,将自己工作内存中的count值更新到主内存中,此时count11

7.    那么线程2执行完了,此时线程1重新获得CPU执行权,它已经完成了count++的前两步了,就剩最后一步更新操作了,那么它此时同线程2一样,再次从主内存中读取共享变量count的值,一样就更新、不一样就撤销放弃操作。然而count的值已经被线程2更新过了,此时为11,而线程1最开始期望的count的值为10,两次读取到count的值不一样了!!!所以线程1就会自动撤销本次count++操作。

8.    最终,本次线程12count变量执行++操作的结果:线程1失败、线程2成功,最后count的值只增加了1次,为11


下面,我再来说一下什么是CAS中的ABA问题。

·       CAS实现原子操作背后有一个假设:共享变量的当前值与当前线程提供的期望值相同,就认为这个变量没有被其他线程修改过。

·       实际上这种假设不一定总是成立的。假如说有共享变量 count=100

·       A线程将 count 修改为 200

·       B线程将 count 修改为 300

·       C线程将 count 修改为 100

·       当前线程看到的count变量的值是100,这就与最初的count变量的值是一样的,那么是否就认为count变量的值没有被其他线程更新呢?显然不是啊,它明显的被AB两个线程更新过。

·       这就是CAS中的ABA问题,即共享变量经历了 A → B → A 的更新。

·       如果想要规避ABA问题,可以为共享变量引入一个修订好(或者叫时间戳),每次修改共享变量时,相应的修订号就会增加1ABA问题的过程就转变为:[A0] → [B1] → [A2] ,每次修改共享变量都会导致修订号的增加,通过修订号就可以准确的判断共享便是否被其他线程修改过。在原子类中,AtomicReference就会面临ABA问题的困扰,而AtomicStampedReference 可以很好的规避ABA问题。详细内容看下面的代码案例。


2.原子变量类


原子变量类基于CAS实现的,当对共享变量进行read-modify-write更新操作时,通过原子变量类可以保障操作的原子性与可见性。对变量的 read-modify-write 更新操作是指当前操作不是一个简单的赋值,而是变量的新值依赖变量的旧值,如自增操作i++。由于volatile只能保证可见性、无法保障原子性,原子变量类内部就是借助一个Volatile变量,并且保障了该变量的 read-modify-write 操作的原子性,有时把原子变量类看作增强的 volatile 变量。

原子变量类有 12 个,如:(这12个原子变量类全部位于JUC这个Java并发包的atomic子包下的)

分组

原子变量类

基础数据型

AtomicInteger AtomicLongAtomicBoolean

数组型

AtomicIntegerArrayAtomicLongArrayAtomicReferenceArray

字段更新器

AtomicIntegerFieldUpdaterAtomicLongFieldUpdaterAtomicReferenceFieldUpdater

引用型

AtomicReferenceAtomicStampedReferenceAtomicMarkableReference

2.1AtomicInteger

package com.szh.volatiletest;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * 使用原子类进行自增操作
 */
public class Test03 {
    public static void main(String[] args) throws InterruptedException {
        //在main线程中创建10个子线程
        for (int i = 0; i < 10; i++) {
            new MyThread().start();
        }
        Thread.sleep(1000);
        System.out.println(MyThread.count.get());
    }
    static class MyThread extends Thread {
        private static AtomicInteger count=new AtomicInteger();
        public static void addCount() {
            for (int i = 0; i < 1000; i++) {
                count.getAndIncrement();
            }
            System.out.println(Thread.currentThread().getName() + " count = " + count.get());
        }
        @Override
        public void run() {
            addCount();
        }
    }
}

首先是AtomicInteger,它其中的 getAndIncrement 方法就是 count++ 操作,get() 方法是获取最终的执行结果,可以看到10个子线程各自执行1000 count++ 操作,最终的结果正确无误:10000

2.2 AtomicLong

这里我是用 AtomicLong 定义一个简单的计数器,来模拟实现原子操作

main主线程中一共创建了1000个子线程,每个线程都看作一个请求,所以请求总数肯定是1000,然后我模拟偶数就算请求成功、奇数就算请求失败。那么这个时候,因为Java中的多线程是采用抢占式来调度的,所以这里成功和失败的比值一般来说不会是11(各500次),输出结果中,我们也看到了是494506,当然这个执行结果也不是唯一的。

package com.szh.atomiclong;
import java.util.concurrent.atomic.AtomicLong;
/**
 * 使用原子变量类定义一个计数器
 * 该计数器,在整个程序中都能使用,所以这个计数器可以设计为单例
 */
public class Indicator {
    //构造方法私有化
    private Indicator() {}
    //定义一个私有的本类静态对象
    private static final Indicator INSTANCE=new Indicator();
    //提供一个公共静态方法,获取该类唯一的实例
    public static Indicator getInstance() {
        return INSTANCE;
    }
    //使用原子变量类
    private final AtomicLong requestCount=new AtomicLong(0); //记录请求总数
    private final AtomicLong successCount=new AtomicLong(0); //记录成功总数
    private final AtomicLong failureCount=new AtomicLong(0); //记录失败总数
    //有新的请求
    public void newRequestReceive() {
        requestCount.getAndIncrement();
    }
    //处理成功
    public void requestReceiveSuccess() {
        successCount.getAndIncrement();
    }
    //处理失败
    public void requestReceiveFialure() {
        failureCount.getAndIncrement();
    }
    //查看请求总数
    public long getRequestCount() {
        return requestCount.get();
    }
    //查看成功总数
    public long getSuccessCount() {
        return successCount.get();
    }
    //查看失败总数
    public long getFailureCount() {
        return failureCount.get();
    }
}
package com.szh.atomiclong;
import java.util.Random;
/**
 *
 */
public class Test {
    public static void main(String[] args) {
        for (int i = 0; i < 1000; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    //每个线程就是一个请求,请求总数加1
                    Indicator.getInstance().newRequestReceive();
                    int num=new Random().nextInt();
                    if (num % 2 == 0) { //模拟偶数为请求成功
                        Indicator.getInstance().requestReceiveSuccess();
                    }else { //模拟奇数为请求失败
                        Indicator.getInstance().requestReceiveFialure();
                    }
                }
            }).start();
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //打印请求数
        System.out.println(Indicator.getInstance().getRequestCount()); //请求总数
        System.out.println(Indicator.getInstance().getSuccessCount()); //成功总数
        System.out.println(Indicator.getInstance().getFailureCount()); //失败总数
    }
}


2.3 AtomicIntegerArray


案例1AtomicIntegerArray中的常用方法。

package com.szh.atomicarray;
import java.util.concurrent.atomic.AtomicIntegerArray;
/**
 * AtomicIntegerArray 原子数组类的基本操作
 */
public class Test01 {
    public static void main(String[] args) {
        //创建一个指定长度的原子数组
        AtomicIntegerArray array=new AtomicIntegerArray(10);
        System.out.println(array);
        //返回指定位置的元素
        System.out.println(array.get(0));
        System.out.println(array.get(8));
        //设置指定位置的元素
        array.set(0,20);
        //在设置某个位置元素的新值时,返回该元素的旧值
        System.out.println(array.getAndSet(1,66));
        System.out.println(array);
        //将某个位置元素的值,加上一个数,返回该元素的新值
        System.out.println(array.addAndGet(0,30));
        System.out.println(array.addAndGet(1,4));
        System.out.println(array);
        //CAS操作
        //如果数组中索引为0的元素的值为50,则将其替换为新值1,返回true
        //反之,如果该索引的元素的值不是50,则不进行替换,返回false
        System.out.println(array.compareAndSet(0,50,1));
        System.out.println(array);
        //自增/自减
        System.out.println(array.incrementAndGet(0)); // ++i
        System.out.println(array.getAndIncrement(1)); // i++
        System.out.println(array.decrementAndGet(2)); // --i
        System.out.println(array.getAndDecrement(3)); // i--
        System.out.println(array);
    }
}


案例2:定义一个原子数组,一个线程数组,10个线程分别将该原子数组中的每个元素自增1000次,所以最终得到的原子数组中,每个元素的值应该为:10*1000=10000

package com.szh.atomicarray;
import java.util.concurrent.atomic.AtomicIntegerArray;
/**
 * 在多线程中使用 AtomicIntegerArray 原子数组类
 */
public class Test02 {
    //定义一个原子数组
    static AtomicIntegerArray array=new AtomicIntegerArray(10);
    public static void main(String[] args) {
        //定义一个线程数组
        Thread[] threads=new Thread[10];
        for (int i = 0; i < threads.length; i++) {
            threads[i]=new AddThread();
        }
        //开启子线程
        for (Thread thread : threads) {
            thread.start();
        }
        //在主线程中查看自增之后的原子数组中每个元素的值,这里需要等待所有子线程执行完毕之后再查看
        //所以这里可以使用join方法进行线程合并,把所有的子线程合并到当前主线程中
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //打印原子数组
        System.out.println(array);
    }
    //定义一个线程类,在这个类中修改原子数组
    static class AddThread extends Thread {
        @Override
        public void run() {
            //将原子数组中的每个元素自增1000次
            for (int j=0;j<1000;j++) {
                for (int i = 0; i < array.length(); i++) {
                    array.getAndIncrement(i % array.length());
                }
            }
        }
    }
}


2.4 AtomicIntegerFieldUpdater

AtomicIntegerFieldUpdater可以对原子整数字段进行更新,要求:

1.    字符必须使用 volatile 修饰,使线程之间可见。

2.    只能是实例变量,不能是静态变量,也不能使用 final 修饰。

在这个Java Bean中,我将age属性设置为volatile修饰,也就是说此时这个age对所有的线程都是可见的。

package com.szh.atominintegerfiledupdater;
/**
 *
 */
public class User {
    int id;
    volatile int age;
    public User(int id, int age) {
        this.id = id;
        this.age = age;
    }
    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", age=" + age +
                '}';
    }
}
package com.szh.atominintegerfiledupdater;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/**
 *
 */
public class SubThread extends Thread {
    //创建要更新的User对象
    private User user;
    //创建 AtomicIntegerFieldUpdater 更新器
    private AtomicIntegerFieldUpdater<User> updater=AtomicIntegerFieldUpdater.newUpdater(User.class,"age");
    public SubThread(User user) {
        this.user=user;
    }
    @Override
    public void run() {
        //在子线程中对User对象的age字段进行自增操作
        for (int i=0;i<10;i++) {
            System.out.println(updater.getAndIncrement(user));
        }
    }
}

上面这段代码,我定义了一个字段更新器,它的构造函数中传入两个值,第一个是User.class表示要更新User这个类(传入该类的字节码文件),第二个参数代表你要更新的字段名(必须是volatile修饰的)。

下面这段代码,main主线程中开启了10个子线程,每个线程执行自己的run方法时,都对age字段进行10次的自增操作,那么最终age字段一共会被自增10*10=100次,而age的初始值我们传入了20,最终age=20+100=120

package com.szh.atominintegerfiledupdater;
/**
 *
 */
public class Test {
    public static void main(String[] args) {
        User user=new User(1001,20);
        //开启10个线程
        for (int i = 0; i < 10; i++) {
            new SubThread(user).start();
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(user);
    }
}

2.5 AtomicReference

案例1:这个原子类属于引用型的,可以原子的读写一个对象。

package com.szh.atomicreference;
import java.util.concurrent.atomic.AtomicReference;
/**
 * 使用 AtomicReference 原子读写一个对象
 */
public class Test01 {
    //创建一个AtomicReference对象
    static AtomicReference<String> reference=new AtomicReference<>("abc");
    public static void main(String[] args) throws InterruptedException {
        //开启10个线程,来修改字符串
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    if (reference.compareAndSet("abc","def")) {
                        System.out.println(Thread.currentThread().getName() + " 把字符串abc被修改为了def");
                    }
                }
            }).start();
        }
        //开启10个线程,来修改字符串
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    if (reference.compareAndSet("def","abc")) {
                        System.out.println(Thread.currentThread().getName() + " 把字符串def还原为了abc");
                    }
                }
            }).start();
        }
        Thread.sleep(1000);
        System.out.println(reference.get());
    }
}


案例2:演示AtomicReference中的ABA问题。

package com.szh.atomicreference;
import java.util.concurrent.atomic.AtomicReference;
/**
 *
 */
public class Test02 {
    private static AtomicReference<String> reference=new AtomicReference<>("abc");
    public static void main(String[] args) throws InterruptedException {
        //创建一个线程,先把abc修改为def,再把def还原为abc
        Thread t1=new Thread(new Runnable() {
            @Override
            public void run() {
                reference.compareAndSet("abc","def");
                System.out.println(Thread.currentThread().getName() + " ---> " + reference.get());
                reference.compareAndSet("def","abc");
            }
        });
        Thread t2=new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(reference.compareAndSet("abc","xyz"));
            }
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(reference.get());
    }
}

2.6 AtomicStampedReference


这个原子类可以很好的规避CAS中的ABA问题。

package com.szh.atomicreference;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
 * AtomicStampedReference 原子类可以解决 CAS 中的 ABA 问题
 * 在 AtomicStampedReference 原子类中有一个整数标记值 stamp,
 * 每次执行 CAS 操作时,需要对比它的版本,即比较 stamp 的值
 */
public class Test03 {
    //定义 AtomicStampedReference 引用操作"abc"字符串, 指定初始化版本号为 0
    private static AtomicStampedReference<String> reference=new AtomicStampedReference<>("abc",0);
    public static void main(String[] args) throws InterruptedException {
        Thread t1=new Thread(new Runnable() {
            @Override
            public void run() {
                reference.compareAndSet("abc","def",reference.getStamp(),reference.getStamp()+1);
                System.out.println(Thread.currentThread().getName() + " ---> " + reference.getReference());
                reference.compareAndSet("def","abc",reference.getStamp(),reference.getStamp()+1);
            }
        });
        Thread t2=new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(reference.compareAndSet("abc","xyz",reference.getStamp(), reference.getStamp()+1));
            }
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(reference.getReference());
    }
}

相关文章
|
2天前
|
Java 程序员 开发者
Java社招面试题:一个线程运行时发生异常会怎样?
大家好,我是小米。今天分享一个经典的 Java 面试题:线程运行时发生异常,程序会怎样处理?此问题考察 Java 线程和异常处理机制的理解。线程发生异常,默认会导致线程终止,但可以通过 try-catch 捕获并处理,避免影响其他线程。未捕获的异常可通过 Thread.UncaughtExceptionHandler 处理。线程池中的异常会被自动处理,不影响任务执行。希望这篇文章能帮助你深入理解 Java 线程异常处理机制,为面试做好准备。如果你觉得有帮助,欢迎收藏、转发!
33 14
|
5天前
|
安全 Java 程序员
Java 面试必问!线程构造方法和静态块的执行线程到底是谁?
大家好,我是小米。今天聊聊Java多线程面试题:线程类的构造方法和静态块是由哪个线程调用的?构造方法由创建线程实例的主线程调用,静态块在类加载时由主线程调用。理解这些细节有助于掌握Java多线程机制。下期再见! 简介: 本文通过一个常见的Java多线程面试题,详细讲解了线程类的构造方法和静态块是由哪个线程调用的。构造方法由创建线程实例的主线程调用,静态块在类加载时由主线程调用。理解这些细节对掌握Java多线程编程至关重要。
34 13
|
6天前
|
安全 Java 开发者
【JAVA】封装多线程原理
Java 中的多线程封装旨在简化使用、提高安全性和增强可维护性。通过抽象和隐藏底层细节,提供简洁接口。常见封装方式包括基于 Runnable 和 Callable 接口的任务封装,以及线程池的封装。Runnable 适用于无返回值任务,Callable 支持有返回值任务。线程池(如 ExecutorService)则用于管理和复用线程,减少性能开销。示例代码展示了如何实现这些封装,使多线程编程更加高效和安全。
|
1月前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
109 17
|
1月前
|
缓存 安全 算法
Java 多线程 面试题
Java 多线程 相关基础面试题
|
3天前
|
Python
python3多线程中使用线程睡眠
本文详细介绍了Python3多线程编程中使用线程睡眠的基本方法和应用场景。通过 `time.sleep()`函数,可以使线程暂停执行一段指定的时间,从而控制线程的执行节奏。通过实际示例演示了如何在多线程中使用线程睡眠来实现计数器和下载器功能。希望本文能帮助您更好地理解和应用Python多线程编程,提高程序的并发能力和执行效率。
32 20
|
9天前
|
安全 Java C#
Unity多线程使用(线程池)
在C#中使用线程池需引用`System.Threading`。创建单个线程时,务必在Unity程序停止前关闭线程(如使用`Thread.Abort()`),否则可能导致崩溃。示例代码展示了如何创建和管理线程,确保在线程中执行任务并在主线程中处理结果。完整代码包括线程池队列、主线程检查及线程安全的操作队列管理,确保多线程操作的稳定性和安全性。
|
2月前
|
NoSQL Redis
单线程传奇Redis,为何引入多线程?
Redis 4.0 引入多线程支持,主要用于后台对象删除、处理阻塞命令和网络 I/O 等操作,以提高并发性和性能。尽管如此,Redis 仍保留单线程执行模型处理客户端请求,确保高效性和简单性。多线程仅用于优化后台任务,如异步删除过期对象和分担读写操作,从而提升整体性能。
79 1
|
4月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
79 1
|
4月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
66 3