史上最全的Java容器集合之ConcurrentHashMap1.8(源码解读)(一)

简介: 史上最全的Java容器集合之ConcurrentHashMap1.8(源码解读)(一)

ConcurrentHashMap

Tips:其实今天讲它肯定是大概的过一下,它既然是一个线程安全的容器,那么线程安全也要涉及到很多的知识点,比如

  • 悲观锁与乐观锁
  • 原子性,指令有序性和线程可见性
  • 无锁算法
  • 内存屏障
  • Java内存模型

这些目前等讲JVM的时候我们再一起去探讨吧,我们今天主要是过一下它,等后面把知识点串起来就会明白的。

跟HashTable 的区别,1.7和1.8的比较

ConcurrentHashMap是conccurrent家族中的一个类,由于它可以高效地支持并发操作,以及被广泛使用,经典的开源框架Spring的底层数据结构就是使用ConcurrentHashMap实现的。与同是线程安全的老大哥HashTable相比,它已经更胜一筹,因此它的锁更加细化,而不是像HashTable一样为几乎每个方法都添加了synchronized锁,这样的锁无疑会影响到性能。

1.7和1.8实现线程安全的思想也已经完全变了其中抛弃了原有的Segment 分段锁,而采用了 CAS + synchronized 来保证并发安全性。它沿用了与它同时期的HashMap版本的思想,底层依然由“数组”+链表+红黑树的方式思想,但是为了做到并发,又增加了很多辅助的类,例如TreeBin,Traverser等对象内部类。

继承结构

image.png

跟HashMap就是一模一样

其实一个类是用来干嘛的再类的最开始的介绍是很详细的,但是我的渣渣英语水平,太难了,如果有能力的童鞋可以去好好看看。总结一下说了啥:

  • JDK1.8底层是散列表+红黑树
  • ConCurrentHashMap支持高并发的访问和更新,它是线程安全的
  • 检索操作不用加锁,get方法是非阻塞的
  • key和value都不允许为null

常量

  //表的最大容量
  private static final int MAXIMUM_CAPACITY = 1 << 30;
  //默认表的容量
    private static final int DEFAULT_CAPACITY = 16;
  //最大数组长度
    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
  //默认并发级别
    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
  //负载因子
    private static final float LOAD_FACTOR = 0.75f;
  //树化阈值
    static final int TREEIFY_THRESHOLD = 8;
  //去树化阈值
    static final int UNTREEIFY_THRESHOLD = 6;
  //树化的最小容量
    static final int MIN_TREEIFY_CAPACITY = 64;
  //转移的最小值
    private static final int MIN_TRANSFER_STRIDE = 16;
  //生成sizeCtl的最小位数
    private static int RESIZE_STAMP_BITS = 16;
  //进行扩容允许的最大线程数
    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
  //sizeCtl所需要的偏移位数
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
  //标志值
    static final int MOVED     = -1; // hash for forwarding nodes
    static final int TREEBIN   = -2; // hash for roots of trees
    static final int RESERVED  = -3; // hash for transient reservations
    static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
  //cpu数量
    static final int NCPU = Runtime.getRuntime().availableProcessors();
  //序列化属性
    private static final ObjectStreamField[] serialPersistentFields = {
        new ObjectStreamField("segments", Segment[].class),
        new ObjectStreamField("segmentMask", Integer.TYPE),
        new ObjectStreamField("segmentShift", Integer.TYPE)
    };
复制代码

大部分的和HashMap差不多,只有少数的不同

成员变量

     //node数组,第一次插入节点时初始化 都加了内存屏障
    transient volatile Node<K,V>[] table;
     //用于扩容
    private transient volatile Node<K,V>[] nextTable;
     //计算器值,通过CAS修改值,没有竞争时使用,或者出现多线程初始化时回滚
    private transient volatile long baseCount;
     //初始化和扩容的标志,concurrent包中有很多类似用法
     // -1 初始化中; -N (N-1)个线程在扩容;table没有数据 初始化的大小 ; table有数据 下一次扩容的大小
  // 非常重要的一个属性,源码中的英文翻译,直译过来是下面的四行文字的意思
  //     sizeCtl = -1,表示有线程正在进行真正的初始化操作
  //     sizeCtl = -(1 + nThreads),表示有nThreads个线程正在进行扩容操作
  //     sizeCtl > 0,表示接下来的真正的初始化操作中使用的容量,或者初始化/扩容完成后的threshold
  //     sizeCtl = 0,默认值,此时在真正的初始化操作中使用默认容量
    //sizeCtl = -(1 + nThreads)这个,网上好多都是用第二句的直接翻译去解释代码,这样理解是错误的
  // 默认构造的16个大小的ConcurrentHashMap,只有一个线程执行扩容时,sizeCtl = -2145714174,
  //     但是照这段英文注释的意思,sizeCtl的值应该是 -(1 + 1) = -2
  // sizeCtl在小于0时的确有记录有多少个线程正在执行扩容任务的功能,但是不是这段英文注释说的那样直接用 -(1 + nThreads)
  // 实际中使用了一种生成戳,根据生成戳算出一个基数,不同轮次的扩容操作的生成戳都是唯一的,来保证多次扩容之间不会交叉重  叠,
  //     当有n个线程正在执行扩容时,sizeCtl在值变为 (基数 + n)
  // 1.8.0_111的源码的383-384行写了个说明:A generation stamp in field sizeCtl ensures that resizings do not overlap.
    private transient volatile int sizeCtl;
    /**
     * The next table index (plus one) to split while resizing.
     */
     //transfer的table索引
    private transient volatile int transferIndex;
     //扩容或创建counterCells的自旋锁
    private transient volatile int cellsBusy;
     // CounterCell数组
    private transient volatile CounterCell[] counterCells;
    // views
    private transient KeySetView<K,V> keySet;
    private transient ValuesView<K,V> values;
    private transient EntrySetView<K,V> entrySet;
复制代码

这里重点解释一下sizeCtl这个属性。可以说它是ConcurrentHashMap中出镜率很高的一个属性,因为它是一个控制标识符,在不同的地方有不同用途,而且它的取值不同,也代表不同的含义。

  • 负数代表正在进行初始化或扩容操作
  • -1代表正在初始化
  • -N 表示有N-1个线程正在进行扩容操作
  • 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小,这一点类似于扩容阈值的概念。还后面可以看到,它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。

构造方法

image.png

其实和HashMap是大同小异的,此时ConcurrentHashMap的构造方法逻辑和HashMap基本一致,只是多了concurrencyLevel和SizeCtl。 而且此时也没有初始化table,它是要等到第一次put的时候才初始化table,

成员方法

ConcurrentHashMap#initTable()

image.png


再我们put方法中,首先会判断我们存放数据的table是否为null如果为null,这个时候就要初始化我们的方法了

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)// sizeCtl < 0 标示有其他线程正在进行初始化操作,把线程让出cpu,对于table的厨师操作,只能有一个线程在进行
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {    //如果把sizeCtl原子更新为-1成功,则当前线程进入初始化
            // 如果原子更新失败则说明有其它线程先一步进入初始化了,则进入下一次循环
            // 如果下一次循环时还没初始化完毕,则sizeCtl<0进入上面if的逻辑让出CPU
            // 如果下一次循环更新完毕了,则table.length!=0,退出循环
                try {<br>                    // 为什么还要判断,因为:如果走到下面的finally改变了sizeCtl值,有可能其他线程是会进入这个逻辑的
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 默认大小是16
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2); // 0.75*n,下一次扩容阈值
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
复制代码

再这个初始化过程中,就已经有乐观锁的实现了。 可以看出table的初始化在一个cas方法中进行,当table为null或者长度为0时进入while方法。

进入之后判断sizeCtl的指,如果小于0则线程让步。由于初始状态sizeCtl是等于0的,说明前面已经有线程进入了elseif这部分,将sc的值置为-1,表示正在初始化。

如果sc大于0,则取sc,否则取默认容量16。然后计算下一次元素数量达到多少时需要resize。总结一下初始化方法

  • 如果sizeCtl小于0,说明别的数组正在进行初始化,则让出执行权
  • 如果sizeCtl大于0的话,则初始化一个大小为sizeCtl的数组
  • 否则的话初始化一个默认大小(16)的数组
  • 然后设置sizeCtl的值为数组长度的3/4

ConcurrentHashMap#transfer(Node

[],Node

)

image.png

image.png

该方法的目的是实现数组的转移,即ConcurrentHashMap的扩容逻辑。就是HashMap的resize方法

在ConcurrentHashMap中,扩容虽然和HashMap一样,将Node数组的长度变为原来的两倍,但是为了保证多线程的同步性,ConcurrentHashMap引入了nextTable属性。在扩容过程中,大致可以分为三步:

  • 初始化一个空数组nextTable,长度为node数组的两倍,用作扩容后的数组的临时存储。
  • 将原来的node数组复制到nextTable中。
  • 将nextTable赋给原来的Node数组,并将nextTable置为null,修改sizeCtl。 ConcurrentHashMap通过遍历实现数组的复制,根据数组中Node节点的类型不同做不同处理。
  • (1)普通Node类型,表示链表中的节点,根据其下标i放入对应的nextTable中i和n+i的位置,采用头插法,链表顺序与原来相反
  • (2)ForwardingNode类型,表示已经处理过
  • (3)TreeBin类型,表示红黑树的节点,进行红黑树的复制,并考虑是否需要去树化。
  • (4)null,表示此处没有节点,加入ForwardingNode节点。根据上面的ConcurrentHashMap#helpTransfer(Node<K,V>[], Node<K,V>)可以知道,ForwardingNode类型的节点会触发其它线程加入数组的复制过程,即并发扩容。 另外,ConcurrentHashMap复制数组时的遍历是从n到0进行遍历的,并且不会完全遍历,而是按照线程数量分成若干个小人物,每个线程每次负责复制stride(步进)个桶([transfer-stride, transfer-1])。任务完成后可以再次申请。stride根据cpu数量而定,最小为16。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    //确定stride
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // initiating 
    //初始化,即使多个线程同时进入,也只不过是创建了多个Node<K,V>[]数组nt,在赋值给nextTab时后者覆盖前者,线程必然安全
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;
    //数组元素复制结束的标志位
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    //advance表示该节点是否处理成功,处理成功后继续遍历,否则该节点再次处理(CAS)
    boolean advance = true;
    //循环是否接受的标志
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {
              //数组复制结束后的操作
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1); 原数组长度的1.75倍,即扩容后的0.75倍
                return;
            }
             //利用CAS方法更新sizeCtl,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作  
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
         //如果遍历到的节点为空 则放入ForwardingNode指针  
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        //如果遍历到ForwardingNode节点  说明这个点已经被处理过了 直接跳过
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
          //synchronized锁保证节点复制的线程安全
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    //链表节点,头插法得到ln和hn两条链表,分别对应nextTable中下标i和n+i的元素
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    //红黑树节点,先尾插法得到由TreeNode组成的ln和hn两条链表,分别对应nextTable中下标i和n+i的元素,然后作为参数传入TreeBin的构造方法
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}
复制代码

image.png



我只能说复杂的一批。扩容是最复杂的,多线程的数据复制,还有红黑树的转换。脑子不够用。大神们去探究吧,我就记记结论吧

目录
相关文章
|
5天前
|
存储 前端开发 Java
深入探索Java集合框架:核心组件与高效应用
深入探索Java集合框架:核心组件与高效应用
|
2天前
|
存储 IDE Java
32. 【Java教程】集合
32. 【Java教程】集合
9 1
|
2天前
|
NoSQL Java Redis
【Redis】 Java操作客户端命令——集合操作与有序集合操作
【Redis】 Java操作客户端命令——集合操作与有序集合操作
|
5天前
|
存储 安全 算法
Java语言中的集合框架:深入解析与应用
Java语言中的集合框架:深入解析与应用
|
5天前
|
存储 算法 Java
Java语言中的集合框架:深入解析与应用
Java语言中的集合框架:深入解析与应用
|
16天前
|
Prometheus 监控 Cloud Native
构建高效稳定的Docker容器监控体系
【5月更文挑战第20天】 在微服务架构日益普及的今天,Docker作为其重要的实现技术之一,承载着大量应用的运行。然而,随之而来的是对于容器健康状态、资源使用情况以及性能指标的监控需求急剧增加。本文旨在探讨构建一个高效且稳定的Docker容器监控体系,不仅涵盖了监控工具的选择与配置,还详细阐述了监控数据的分析与处理流程。通过精心设计的监控策略和实时响应机制,我们能够确保系统的稳定性,并及时发现及处理潜在的问题。
|
5天前
|
Docker 容器
docker: 如何不新建容器 修改运行容器的端口
docker: 如何不新建容器 修改运行容器的端口
|
5天前
|
数据管理 Linux Docker
docker 数据管理 与容器互联
docker 数据管理 与容器互联
|
6天前
|
Linux Docker 容器
蓝易云 - 【Linux】如何在linux系统重启或启动时执行命令或脚本(也支持docker容器内部)
以上就是在Linux系统和Docker容器中设置启动时运行命令或脚本的方法。希望对你有所帮助。
80 0
|
7天前
|
大数据 Linux Docker
mac docker 宿主机和容器间网络打通
mac docker 宿主机和容器间网络打通
6 0