20.Atmoic系列Strimped64分段锁底层实现源码剖析

简介: 20.Atmoic系列Strimped64分段锁底层实现源码剖析

老王:小陈啊,上一章节我们对LongAdder的底层源码、实现机制进行了深入了剖析,包括AtomicInteger在高并发竞争下导致的大量自旋的问题,以及LongAdder是怎么使用分段锁优化这个问题的。

老王:我们还以银行办事大厅的常规窗口、备用窗口为例子说明了什么是分段锁机制,这些东西啊,都要记得啊;有上一章的基础、懂得什么是分段锁这一章我才能带你来深入的分析一下Striped64的分段锁机制的实现。

上章内容回顾

小陈:老王,放心吧,上一章的内容我还记着呢,我现在就可以给你总结一下上一章的内容

首先是分段锁的内容

(1)首先银行办事大厅访客比较少的时候,只开放常规窗口,就足以处理用户请求了

(2)但是由于一个窗口同一时间只能处理一个请求,所以在高并发的时候,容易造成大量的用户在等待常规窗口

(3)于是大流量的时候就开启备用窗口比如有4个备用窗口,然后在备用窗口已经开启的时候,就会使用用户id % 4 的算法将用户派到不同的备用窗口,这样减少对锁的竞争,大大提升了并发性能。

image.png

老王:没错,对分段锁的内容记得很牢固啊,看来我果然没有看错你啊。

老王:那你继续来说说LongAdder底层最核心的add源码流程是咋样的?

小陈:嘿嘿,这也难不倒我,我先给你上代码:


public void add(long x) {
    // as就类似我们上述说的备用窗口列表
    Cell[] as;
    // 这里的b就是常规窗口的值,v是你被分派到的那个窗口的value值(可能是常规窗口也可能是备用窗口)
    long b, v;
    // m是备用窗口的长度,
    // 上面我们讲过getProbe()方法就是获取用户id的方法
    // getProbe() & m 其实就是 用户id % 窗口总数,窗口分派的算法
    int m;
    // a就是你被派遣到的那个窗口
    Cell a;
    // 1.首先如果cells==null,说明备用窗口没有开放,
    // 全都执行casBase争抢常规窗口,cas成功则争抢成功,然后办完事就退出了
    // 如果争抢失败 casBase == false,则会进入if代码内重试
    // 2. 如果cells != null,说明备用窗口开发了,不用去争抢常规窗口了,
    // 直接就进入争抢备用窗口
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        // 3. as == null || as.length - 1 < 0 说明备用窗口列表尚未开放
        if (as == null || (m = as.length - 1) < 0 ||
            // 4. as[getProbe() & m] 是你被派遣到的那个备用窗口
            // (a = as[getProbe() & m]) == null 你被派遣到的那个备用窗口还没人工作
            (a = as[getProbe() & m]) == null ||
            // 5. a.cas() 就是你被分派到窗口a后,去尝试争抢窗口a的权限
            // 如果 uncontented就是你争抢的结果,如果!uncnotented == true,说明你争抢失败了
            !(uncontended = a.cas(v = a.value, v + x)))
            // 相当于上面操作都失败之后的一种兜底方案
            longAccumulate(x, null, uncontended);
    }
}

小陈:然后给你重新画一下上面的这个代码的流程图:

image.png

老王:嗯嗯,可以的,看来你记得相当牢固了,好了,那我也就不多废话了,直接进入本章主题吧。

老王:上一章我们讲LongAdder的时候,讲了怎么使用分段锁、怎么减少并发的竞争,以及它在竞争窗口的时候具体的流程我们上面已经画图讲解了 ,最后剩下的只是没有讲解的longAccumulate的源码了。

老王:那这一章我们就来分析分析,从longAccmulate方法内部源码角度了解的分段锁实现机制到底是怎么样子的?

Striped64底层实现

longAccumulate底层源码:

我们首先来看一下longAccumulate底层使用分段锁实现加减操作的源码,源码稍微有点复杂,但是我们慢慢来分析,别害怕哈。

我们先从整体来看下这个方法内部做了几件事

image.png

longAccumulate里面做了几件事情,先从大角度分析一下:

(1)进入方法,黄色部分,首先就是获取一下用户id,如果为0,先强制初始化一下用户id

(2)然后就是进入for死循环里面,只有用户的请求成功处理了,才会退出循环

(3)然后就是下面比较重要的三个大的分支条件

进入备用窗口处理


// cells是备用窗口列表,如果备用窗口不等于null,
// 并且是length>0 说明备用窗口开启了
// 则用户进入这个条件,去备用窗口列表里面处理
if ((as = cells) != null && (n = as.length) > 0)

初始化备用窗口

如果上面的分支不满足,说明 cells == null 或者 cells.length <= 0说明备用窗口没开启啊,这个时候就要开启一下备用窗口即进行备用窗口列表数组的初始化工作:


// cellsBusy 是备用窗口列表是否在初始化或者扩容标志
// cellsBusy == 0 说明尚未有人初始化或者扩容
// 这时候执行到这里的线程,就要进行对备用窗口初始化了
// 比如说线程1走到这里,发现没初始化,就要执行一下casCellBusy告诉别人正在初始化
else if (cellsBusy == 0 && cells == as && casCellsBusy())

竞争常规窗口

如果cellsBusy == 1 说明有人在初始化备用窗口或者对备用窗口列表进行扩容这个时候备用窗口列表不可用,只能去常规窗口争抢


// 直接对常规窗口争抢,如果成功就退出死循环了
else if (casBase(v = base, ((fn == null) ? v + x :
                            fn.applyAsLong(v, x))))
    break;

老王:小陈啊,上面我先划分的这三种情况,大体上能看懂不?

小陈:理解起来还是有点难,能不能画个图出来讲一下?

老王:没问题,这就给你画个图说一下:

image.png

(1)比如用户1进入了这里之后,首先判断分支1,发现备用窗口没有启用;然后直接进入分支2,去初始化备用窗口

(2)然后用户2进来了,发现备用窗口没启用,同时发现有人在初始化备用窗口直接进入分支3,去争抢常规窗口了

(3)备用窗口初始化好了开启了之后;用户的请求进来都走到分支1,去备用窗口列表去竞争去了。

老王:我这个讲解你能听得懂不?

小陈:你这么画图和文字说明,我就理解了。

老王:那我们来继续,也就是进入longAccumulate方法的时候,由于备用窗口没有开启,所以最开始有一个线程会进入分支2,我们就先来看看分支2的源码实现:


else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
    // init是否初始化完成,最开始设置为false
    boolean init = false;
    try {                           // Initialize table
        // 然后这里就没什么复杂的,就是创建一个数组
        // 但是这个时候数组里面没有元素,每个元素都是空的
        if (cells == as) {
            Cell[] rs = new Cell[2];
            // 创建一个新的窗口,把自己的操作数x交给新创建的窗口
            rs[h & 1] = new Cell(x);
            cells = rs;
            // 然后设置初始化完成表示为true
            init = true;
        }
    } finally {
        // 重新设置cellsBusy标识,操作完成,没人在初始化或者扩容了
        cellsBusy = 0;
    }
    // 如果初始化完成,退出循环
    if (init)
        break;
}

小陈:嗯嗯,这个初始化看起来还是很简单的,没有什么特别复杂的操作。

老王:好,那我们来看看一个比较复杂的,分支1的源码:


if ((as = cells) != null && (n = as.length) > 0) {
    // 如果自己被派到的那个备用窗口为null,说明窗口还么人工作,则叫一个工作人员过来负责
    if ((a = as[(n - 1) & h]) == null) {
        // 如果没人在初始化,下面则创建一个窗口(叫一个工作人员过来处理)
        if (cellsBusy == 0) {
            // 创建一个新的窗口,同时提交自己的请求x       
            Cell r = new Cell(x);
            // 如果没人在初始化   
            if (cellsBusy == 0 && casCellsBusy()) {
                boolean created = false;
                try {               
                    Cell[] rs; int m, j;
                    // 再次检查备用窗口列表初始化好了
                    if ((rs = cells) != null &&
                        (m = rs.length) > 0 &&
                        rs[j = (m - 1) & h] == null) {
                        // 设置自己创建的新窗口   
                        rs[j] = r;
                        created = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (created)
                    break;
                continue;           // Slot is now non-empty
            }
        }
        collide = false;
    }
    // 如果到这里已经知道cas操作失败了,则重新设置一下失败标识,进入下一循环重试
    else if (!wasUncontended)       // CAS already known to fail
        wasUncontended = true;      // Continue after rehash
    // 根据 用户id % 备用窗口总数 算法,自己应该争抢哪个备用窗口
    // 如果争抢成功,则操作结束了,如果失败进入下一循环重试   
    else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                 fn.applyAsLong(v, x))))
        break;   
    else if (n >= NCPU || cells != as)
        collide = false;           
    else if (!collide)
        collide = true;
    // 走到这里,说明上面的操作都失败了,备用窗口竞争还是很激烈
    // 所以需要扩容了,多增加一些备用窗口,缓解竞争激烈的情况    
    else if (cellsBusy == 0 && casCellsBusy()) {
        try {
            // 这里就是具体的扩容操作了
            if (cells == as) {      // Expand table unless stale
                // 新的备用窗口是原来的2倍
                Cell[] rs = new Cell[n << 1];
                // 原来已经存在的备用窗口重新赋值一下
                for (int i = 0; i < n; ++i)
                    rs[i] = as[i];
                cells = rs;
            }
        } finally {
            cellsBusy = 0;
        }
        collide = false;
        continue;                   // Retry with expanded table
    }
    // 在这里重新计算一下自己的用户id,
    // 使得用户id尽量的分散到不同的窗口,减少竞争
    h = advanceProbe(h);
}

我们在对上面的核心步骤,截图给你再讲一下:

image.png

image.png

老王:小陈啊,Striped64分段锁的源码我们就讲到了这里,你听懂了吗?

小陈:老王,感觉还是有点模糊,我回去再结合你上面的备注,重新理解理解。

老王:嗯嗯,其实里面机制并不算繁杂,每个流程都有一个大的分支,你顺着分支慢慢就理解了多看几遍就好了;源码这个东西就是这样,看一遍不会在整合注释多看个两遍、三遍就慢慢理解了

小陈:嗯嗯,好的......

老王:好了,小陈,这一篇文章也是Atomic系列中的最后一章了。我来简单总结一下在《结丹篇》我们学习的内容,一共讲解了下面的这些东西:

1. 讲解了unsafe是JDK底层提供的一个类,直接通过操作系统级别,可直接操作内存、进行CAS操作、阻塞和唤醒线程;

2. 讲解了CAS底层的原理,是怎么通过总线协调多个CPU进行加锁的,CAS是怎么保证原子性的

3.讲解了Atomic系列的常用的原子类,以及深入到源码级别、实现级别的分析;包括AtomicInteger底层是怎么通过unsafe的CAS操作和volatile保证线程安全的,AtomicBoolean的底层原理等。

4.讲解了AtomicReference底层是怎么通过多个操作合并为一个对象解决多个变量修改的原子性问题,以及CAS操作中经典的ABA问题是什么,以及AtomicStampedReference是怎么通过版本解决ABA问题的?

5.讲解了AtomicInteger在高并发下的劣势,锁竞争激烈导致大量线程自旋;

6.讲解了什么是分段锁?LongAdder底层是怎么通过分段锁解决AtomicInteger在高并发下的劣势的?还分析了LongAdder底层实现的原理是啥?

7.最后分析了LongAdder的父类Striped64,在底层是怎么实现分段锁的,深入到源码级别去分析了分段锁的实现。

老王:小陈啊,到了这里,恭喜你了,你JAVA并发的水平已经有了大幅度的提高,已经正式进入《结丹》境界了。

小陈:我终于到了《结丹》境界了,经历了前面的《筑基篇》、《练气篇》的修炼,以及本篇《结丹篇》的修炼,我也终于到达这个境界了,哈哈......

老王:是啊,经过了前面几篇的学习,你的基础实力有了很大幅度的提高,正式进入《结丹》境界了,但是不要高兴得太早。

小陈:恩恩,老王,我会继续努力的,在这个境界我会重新回去看一下之前讲过的文章,再加深自己的理解,争取在《结丹》境让自己的修为更加凝固

老王:嗯嗯,你有这个觉悟很不错啊,哈哈

小陈:老王,《结丹》篇结束了,我们下一篇学习什么?

老王:下一篇我们进入《金丹篇》 ,这一篇非常重要,讲解的是AQS(抽象队列同步器)的底层原理,要知道JAVA里面的同步器基本都是基于AQS进行实现的;包括ReentrantLock 互斥锁、ReadWriteReentrantLock 读写锁、CountDownLatch 门栓、Semaphore信号量等,我们从源码级别和实现上分析AQS的底层原理,以及基于AQS之上构建的各种同步工具是怎么实现的? 这一篇内容非常多、非常重要、非常丰富,小陈你要准备好了,这一篇才是区分普通人和并发高手的一个标志。

小陈:哇塞,终于讲到AQS和各种同步工具类了,我对这个期待已久了,如今终于要学习了....,老王牛逼......

小陈:那我们下一篇见咯。


相关文章
|
7月前
|
安全 算法 Java
剑指JUC原理-19.线程安全集合(上)
剑指JUC原理-19.线程安全集合
56 0
|
2月前
|
存储 机器学习/深度学习 安全
ConcurrentHashMap核心原理,这次彻底给整明白了!
ConcurrentHashMap核心原理,这次彻底给整明白了!
ConcurrentHashMap核心原理,这次彻底给整明白了!
|
6月前
|
安全
ConcurrentHashMap1.7分段锁原理
ConcurrentHashMap1.7分段锁原理
44 0
|
4月前
|
存储 安全 容器
【多线程面试题二十一】、 分段锁是怎么实现的?
这篇文章解释了分段锁的概念和实现方式,通过将数据分成多个段并在每段数据上使用独立锁,从而降低锁竞争,提高并发访问效率,举例说明了`ConcurrentHashMap`如何使用分段锁技术来实现高并发和线程安全。
【多线程面试题二十一】、 分段锁是怎么实现的?
|
4月前
|
安全 Java
【Java集合类面试十三】、HashMap如何实现线程安全?
实现HashMap线程安全的方法包括使用Hashtable类、ConcurrentHashMap,或通过Collections工具类将HashMap包装成线程安全的Map。
|
4月前
|
Java 调度
【多线程面试题十四】、说一说synchronized的底层实现原理
这篇文章解释了Java中的`synchronized`关键字的底层实现原理,包括它在代码块和方法同步中的实现方式,以及通过`monitorenter`和`monitorexit`指令以及`ACC_SYNCHRONIZED`访问标志来控制线程同步和锁的获取与释放。
|
4月前
|
Java
【多线程面试题十三】、说一说synchronized与Lock的区别
这篇文章讨论了Java中`synchronized`和`Lock`接口在多线程编程中的区别,包括它们在实现、使用、锁的释放、超时设置、锁状态查询以及锁的属性等方面的不同点。
|
5月前
|
存储 安全 Java
Java面试题:请解释Java内存模型,并说明如何在多线程环境下使用synchronized关键字实现同步,阐述ConcurrentHashMap与HashMap的区别,以及它如何在并发环境中提高性能
Java面试题:请解释Java内存模型,并说明如何在多线程环境下使用synchronized关键字实现同步,阐述ConcurrentHashMap与HashMap的区别,以及它如何在并发环境中提高性能
47 0
|
7月前
|
存储 缓存 监控
深入理解Java线程池ThreadPoolExcutor实现原理、数据结构和算法(源码解析)
Java线程池的核心组件包括核心线程数、最大线程数、队列容量、拒绝策略等。核心线程数是线程池长期维持的线程数量,即使这些线程处于空闲状态也不会被销毁;最大线程数则是线程池允许的最大线程数量,当任务队列已满且当前线程数未达到最大线程数时,线程池会创建新线程执行任务;队列容量决定了任务队列的最大长度,当新任务到来时,如果当前线程数已达到核心线程数且队列未满,任务将被放入队列等待执行;拒绝策略则定义了当线程池无法处理新任务时的行为,如抛出异常、丢弃任务等。
119 1
|
7月前
|
算法
19.Atomic系列之LongAdder的底层原理(分段锁提升并发性能)
19.Atomic系列之LongAdder的底层原理(分段锁提升并发性能)
70 0
19.Atomic系列之LongAdder的底层原理(分段锁提升并发性能)
下一篇
DataWorks