《JUC并发编程 - 高级篇》05 -共享模型之无锁 (CAS | 原子整数 | 原子引用 | 原子数组 | 字段更新器 | 原子累加器 | Unsafe类 )(三)

简介: 《JUC并发编程 - 高级篇》05 -共享模型之无锁 (CAS | 原子整数 | 原子引用 | 原子数组 | 字段更新器 | 原子累加器 | Unsafe类 )

5.7 原子累加器

5.7.1 累加器性能比较

/**
 * @author lxy
 * @version 1.0
 * @Description
 * @date 2022/7/22 17:27
 */
@Slf4j(topic = "c.Test33")
public class Test33 {
    public static void main(String[] args) {
        demo(
                ()->new AtomicLong(0),
                adder->adder.getAndIncrement()
        ); 
        demo(
                ()->new LongAdder(),
                adder->adder.increment()
        );
    }
    /*
   () -> 结果    提供累加器对象
   (参数) ->     执行累加操作
    */
    private static <T> void demo(Supplier <T> adderSupplier, Consumer <T> action) {
        T adder = adderSupplier.get();
        List <Thread> ts = new ArrayList <>();
        // 4 个线程,每人累加 50 万
        for (int i = 0; i < 4; i++) {
            ts.add(new Thread(() -> {
                for (int j = 0; j < 500000; j++) {
                    action.accept(adder);
                }
            }));
        }
        long start = System.nanoTime();
        ts.forEach(t -> t.start());
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.nanoTime();
        System.out.println(adder + " cost:" + (end - start) / 1000_000);
    }
}

输出为:

2000000 cost:32
2000000 cost:14


**分析:**性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。


* 源码之 LongAdder


LongAdder 是并发大师 @author Doug Lea (大哥李)的作品,设计的非常精巧

LongAdder 类有几个关键域


// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;
// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy;

注意:volatile保证共享变量的可见性,transient保证序列化下这些属性信息不会被序列化

5.7.2 CAS锁

//使用CAS实现Lock
//不推荐使用:因为会造成CPU的空转...
@Slf4j(topic = "c.Test42")
public class LockCas {
    // 0 没加锁
    // 1 加锁
    private AtomicInteger state = new AtomicInteger(0);
    public void lock() {
        while (true) {
            if (state.compareAndSet(0, 1)) {
                break;
            }
        }
    }
    public void unlock() {
        log.debug("unlock...");
        state.set(0);
    }
    public static void main(String[] args) {
        LockCas lock = new LockCas();
        new Thread(() -> {
            log.debug("begin...");
            lock.lock();
            try {
                log.debug("lock...");
                sleep(1);
            } finally {
                lock.unlock();
            }
        }).start();
        new Thread(() -> {
            log.debug("begin...");
            lock.lock();
            try {
                log.debug("lock...");
            } finally {
                lock.unlock();
            }
        }).start();
    }
}

输出为:

16:35:41.029 c.Test42 [Thread-1] - begin...
16:35:41.028 c.Test42 [Thread-0] - begin...
16:35:41.032 c.Test42 [Thread-1] - lock...
16:35:41.032 c.Test42 [Thread-1] - unlock...
16:35:41.032 c.Test42 [Thread-0] - lock...
16:35:42.040 c.Test42 [Thread-0] - unlock...

* 原理之伪共享

其中 Cell 即为累加单元

// 防止缓存行伪共享
@sun.misc.Contended
static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    // 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
    final boolean cas(long prev, long next) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
    }
  // 省略不重要代码
}

关于缓存行伪共享,得从缓存说起~ ~~

1. 缓存与内存的速度比较

c624fa5f78e65c4dd2dd0dd7365ae362.png

image.png


因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。


而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)


缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中


CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效


比如核心Core1和Core2都从内存中读入数据a = 2 到缓存中。之后Core1把数据改成了2000,此时为了保持数据的一致性,Core2中的数据也需要修改!即把Core2中的缓存行失效,重新去内存中取最新的数据


38671cdb3f38c450d4700bfe20ac9f91.png

因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因此缓存行可以存下 2 个的 Cell 对象(1行64个字节)。这样问题来了:


Core-0 要修改 Cell[0]

Core-1 要修改 Cell[1]

无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要修改为Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效


@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效 。


我们把一个缓存行加入了多个cell对象称为伪共享,注解的作用就是防止伪共享


bb055422607e760bbdde607e2b6cc00f.png


累加主要调用下面的方法

2. add()方法


public void add(long x) {
    // as 为累加单元数组
    // b 为基础值
    // x 为累加值
    Cell[] as;
    long b, v;
    int m;
    Cell a;
    // 进入 if 的两个条件
    // 1. as 有值, 表示已经发生过竞争, 进入 if
    // 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 if
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        // uncontended 表示 cell 没有竞争
        boolean uncontended = true;
        if (
            // as 还没有创建
            as == null || (m = as.length - 1) < 0 ||
            // 当前线程对应的 cell 还没有
            (a = as[getProbe() & m]) == null ||
            // cas 给当前线程的 cell 累加失败 uncontended=false ( a 为当前线程的 cell )
            !(uncontended = a.cas(v = a.value, v + x))
        ) {
            // 进入 cell 数组创建、cell 创建的流程
            longAccumulate(x, null, uncontended);
        }
    }
}

add流程图

4aa71d24e219a078b9e23923ff99f157.png

3. longAccumulate()

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    // 当前线程还没有对应的 cell, 需要随机生成一个 h 值用来将当前线程绑定到 cell
    if ((h = getProbe()) == 0) {
        // 初始化 probe
        ThreadLocalRandom.current();
        // h 对应新的 probe 值, 用来对应 cell
        h = getProbe();
        wasUncontended = true;
    }
    // collide 为 true 表示需要扩容
    boolean collide = false;
    for (; ; ) {
        Cell[] as;
        Cell a;
        int n;
        long v;
        // 已经有了 cells
        if ((as = cells) != null && (n = as.length) > 0) {
            // cells存在,但是里面还没有 cell
            if ((a = as[(n - 1) & h]) == null) {
               //如果cellsBusy没有加锁,为 cellsBusy 加锁, 创建 cell
                if (cellsBusy == 0) {       // Try to attach new Cell
                     // 创建cell,且cell 的初始累加值为 x
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {//进行加锁,并修改cellsBusy=1
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&//判断槽位是否为空
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {//不为空则进行下一次循环尝试
                                rs[j] = r;//为空,则将元素放到槽位上去
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)//如果上面cell创建成功,则退出
                            break;
                        continue;           // Slot is now non-empty
                    }
                // 成功则 break, 否则继续 continue 循环
            }
            // 有竞争, 改变线程对应的 cell 来重试 cas
            else if (!wasUncontended)
                wasUncontended = true;
            // cas 尝试累加, fn 配合 LongAccumulator 不为 null, 配合 LongAdder 为 null
            else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
                break;
            // 如果 cells 长度已经超过了最大长度(CPU的核数), 或者已经扩容, 改变线程对应的 cell 来重试 cas
            else if (n >= NCPU || cells != as)
                collide = false;
            // 确保 collide 为 false 进入此分支, 就不会进入最后的 else if 进行扩容了
            else if (!collide)
                collide = true;
            //修改标志位进行加锁
            else if (cellsBusy == 0 && casCellsBusy()) {
                // 加锁成功, 扩容
                try {
                    if (cells == as) {      // 扩容cells
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;        
            }
            // 改变线程对应的 cell,再重新循环
            h = advanceProbe(h);
        }
        // 还没有 cells, 尝试给 cellsBusy 加锁
        //cellsBusy == 0表示还没有其他线程为其加锁
        //cells == as表示还有没其他线程创建该cells数组(未新建)
        //casCellsBusy() 采用CAS方法将cellsBusy 从0 改为 1。这样做是为了防止其他线程也对其进行加锁..
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            // 加锁成功, 创建 cell数组, 最开始长度为 2。 并创建一个累加单元 cell
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    //注意:虽然我们创建的Cell数组大小是2,但是我们只向其加入了一个元素,
                    //不到万不得已不创建另外一个cell,懒惰化的思想~~~
                    rs[h & 1] = new Cell(x);
                    cells = rs;//赋值给cells
                    init = true;
                }
            } finally {//标志位设置为0,也就是解锁
                cellsBusy = 0;
            }
            // 成功则 break,退出循环;
            if (init)
                break;
        }
        // 上两种情况失败(如加锁失败), 尝试给 base 累加,如果失败则重新循环
        else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
            break;
    }
}

longAccumulate 流程图


cca3a6782b3859573ffcbb22ee5e8bdb.png

f6a47ebd2d867eabdcea54d461eece3a.png

每个线程刚进入 longAccumulate 时,会尝试对应一个 cell 对象(找到一个坑位)

fcc4d182cb63b96fd08a8762f77e1663.png

4. 获取最终结果通过 sum 方法

//对cells数组的元素进行累加
public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

5.8 Unsafe

Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得

unsafe 与线程安全性无关。因为比较接近底层,用来操作内存、线程,不建议我们直接使用

5.8.1 Unsafe CAS 操作

/**
 * @author lxy
 * @version 1.0
 * @Description
 * @date 2022/7/22 17:47
 */
@Slf4j(topic = "c.TestUnsafe")
public class TestUnsafe {
    public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
        //使用反射获取到Unsafe对象
        Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
        theUnsafe.setAccessible(true);
        Unsafe unsafe = (Unsafe) theUnsafe.get(null);
        System.out.println(unsafe);
        //1. 获取域的偏移地址
        long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id"));
        long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name"));
        Teacher teacher = new Teacher();
        //2. 执行cas操作
        unsafe.compareAndSwapInt(teacher,idOffset,0,1);
        unsafe.compareAndSwapObject(teacher,nameOffset,null,"张三");
        //3. 验证
        System.out.println(teacher);
    }
}
@Data
class Teacher{
    volatile int id;
    volatile String name;
}

输出为:

sun.misc.Unsafe@490d6c15
Teacher(id=1, name=张三)

6.8.2 unsafe对象模拟实现原子整数

使用自定义的 MyAtomicInteger 实现之前线程安全的原子整数类 AtomicInteger

/**
 * @author lxy
 * @version 1.0
 * @Description
 * @date 2022/7/22 18:13
 */
public class Test35 {
    public static void main(String[] args) {
        Account.demo(new MyAtomicInteger(10000));
    }
}
class MyAtomicInteger implements Account {
    private volatile int value;
    private static final long valueOffset;
    private static final Unsafe UNSAFE;
    public MyAtomicInteger(int value) {
        this.value = value;
    }
    static {
        UNSAFE = UnsafeAccessor.getUnsafe();
        try {
            valueOffset = UNSAFE.objectFieldOffset(MyAtomicInteger.class.getDeclaredField("value"));
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
            throw new RuntimeException();
        }
    }
    private int getValue(){
        return value;
    }
    public void decrement(int amount){
        while (true){
            int prev = this.value;
            int next = prev-amount;
            if(UNSAFE.compareAndSwapInt(this,valueOffset,prev,next)){
                break;
            }
        }
    }
    @Override
    public Integer getBalance() {
        return getValue();
    }
    @Override
    public void withdraw(Integer amount) {
        decrement(amount);
    }
}
interface Account {
    // 获取余额
    Integer getBalance();
    // 取款
    void withdraw(Integer amount);
    /**
     * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
     * 如果初始余额为 10000 那么正确的结果应当是 0
     */
    static void demo(Account account) {
        List <Thread> ts = new ArrayList <>();
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(() -> {
                account.withdraw(10);
            }));
        }
        long start = System.nanoTime();
        ts.forEach(Thread::start);
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.nanoTime();
        System.out.println(account.getBalance()
                + " cost: " + (end-start)/1000_000 + " ms");
    }
}

本章小结

  • CAS 与 volatile
  • API
  • 原子整数
  • 原子引用
  • 原子数组
  • 字段更新器
  • 原子累加器
  • Unsafe
  • 原理方面*
  • LongAdder 源码
  • 伪共享
相关文章
|
8月前
|
安全 测试技术 Python
详解增强算术赋值:“-=”操作是怎么实现的?
详解增强算术赋值:“-=”操作是怎么实现的?
54 2
|
5月前
|
安全 Java 调度
解锁Java并发编程高阶技能:深入剖析无锁CAS机制、揭秘魔法类Unsafe、精通原子包Atomic,打造高效并发应用
【8月更文挑战第4天】在Java并发编程中,无锁编程以高性能和低延迟应对高并发挑战。核心在于无锁CAS(Compare-And-Swap)机制,它基于硬件支持,确保原子性更新;Unsafe类提供底层内存操作,实现CAS;原子包java.util.concurrent.atomic封装了CAS操作,简化并发编程。通过`AtomicInteger`示例,展现了线程安全的自增操作,突显了这些技术在构建高效并发程序中的关键作用。
83 1
|
6月前
|
安全 Oracle Java
(四)深入理解Java并发编程之无锁CAS机制、魔法类Unsafe、原子包Atomic
其实在我们上一篇文章阐述Java并发编程中synchronized关键字原理的时候我们曾多次谈到过CAS这个概念,那么它究竟是什么?
141 1
|
安全 Java 程序员
JUC中创建的组件 && 多线程使用“哈希表”
JUC中创建的组件 && 多线程使用“哈希表”
61 0
深入理解JUC:第三章:AtomicReference原子引用
深入理解JUC:第三章:AtomicReference原子引用
182 0
深入理解JUC:第三章:AtomicReference原子引用
《JUC并发编程 - 高级篇》05 -共享模型之无锁 (CAS | 原子整数 | 原子引用 | 原子数组 | 字段更新器 | 原子累加器 | Unsafe类 )(二)
《JUC并发编程 - 高级篇》05 -共享模型之无锁 (CAS | 原子整数 | 原子引用 | 原子数组 | 字段更新器 | 原子累加器 | Unsafe类 )
《JUC并发编程 - 高级篇》05 -共享模型之无锁 (CAS | 原子整数 | 原子引用 | 原子数组 | 字段更新器 | 原子累加器 | Unsafe类 )(二)
|
缓存 安全 Java
《JUC并发编程 - 高级篇》05 -共享模型之无锁 (CAS | 原子整数 | 原子引用 | 原子数组 | 字段更新器 | 原子累加器 | Unsafe类 )
《JUC并发编程 - 高级篇》05 -共享模型之无锁 (CAS | 原子整数 | 原子引用 | 原子数组 | 字段更新器 | 原子累加器 | Unsafe类 )
《JUC并发编程 - 高级篇》05 -共享模型之无锁 (CAS | 原子整数 | 原子引用 | 原子数组 | 字段更新器 | 原子累加器 | Unsafe类 )
|
安全
多线程编程核心技术-对象及变量的并发访问-synchronize同步方法(2)(上)
多线程编程核心技术-对象及变量的并发访问-synchronize同步方法(2)(上)
118 0
多线程编程核心技术-对象及变量的并发访问-synchronize同步方法(2)(上)
|
缓存 Java
【多线程:cas】原子更新器 原子累加器 缓存一致性问题
【多线程:cas】原子更新器 原子累加器 缓存一致性问题
156 0