认真学习jdk1.8下ConcurrentHashMap的实现原理

简介: 认真学习jdk1.8下ConcurrentHashMap的实现原理

1.7 已经解决了并发问题,并且能支持 N 个 Segment 这么多次数的并发,但依然存在 HashMap 在 1.7 版本中的问题—查询、遍历链表效率太低。jdk1.8 做了一些数据结构上的调整,先来看下底层的组成结构(其实和jdk1.8下HashMap的数据结构一致,就是数组+链表+红黑树):




2943fe4a768f4a148e8258a84fa80d35.png

其抛弃了原有的 Segment 分段锁(ReentrantLock),在put时采用了 CAS + synchronized 来保证并发安全性。


也将 1.7 中存放数据的 HashEntry 改为 Node,但作用都是相同的,其中的 val , next 都用了 volatile 修饰,保证了内存可见性。


下文中的索引位置就是tab[(n-1)&hash]结点,也可以称之为“槽位”。

【1】核心属性和构造

① 核心属性

 // 最大容量--key-value键值对的最大个数 32bit的最高两位用于控制目的
private static final int MAXIMUM_CAPACITY = 1 << 30;
 //哈希桶-table默认初始化大小 
private static final int DEFAULT_CAPACITY = 16;
 //最大可能数组size(non-power of two),toArray 和关联方法使用
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
 //默认并发级别,定义但没有被使用--为前个版本备留
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
 // 负载因子,默认和HashMap一样  0.75
private static final float LOAD_FACTOR = 0.75f;
 // 链表树化的临界值 如果链表结点个数大于等于8,且tab.length>=64 就转为树
static final int TREEIFY_THRESHOLD = 8;
/**
 * The bin count threshold for untreeifying a (split) bin during a
 * resize operation. Should be less than TREEIFY_THRESHOLD, and at
 * most 6 to mesh with shrinkage detection under removal.
 */
 // 树转为链表的临界值,结点个数小于等于6,就转为链表
static final int UNTREEIFY_THRESHOLD = 6;
 // 树化的一个前提条件-数组.length>=64,否则将扩容而不是树化
static final int MIN_TREEIFY_CAPACITY = 64;
/**
 * Minimum number of rebinnings per transfer step. Ranges are
 * subdivided to allow multiple resizer threads.  This value
 * serves as a lower bound to avoid resizers encountering
 * excessive memory contention.  The value should be at least
 * DEFAULT_CAPACITY.
 */
private static final int MIN_TRANSFER_STRIDE = 16;
 //sizeCtl中用于生成戳的位数。对于32位阵列,必须至少为6。
private static int RESIZE_STAMP_BITS = 16;
 //可以帮助resize的最大线程数。必须适合32位-RESIZE_STAMP_BITS位。
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
 //在sizeCtl中记录大小戳(size stamp)的偏移位。
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//结点哈希字段的编码
static final int MOVED     = -1; // hash for forwarding nodes
static final int TREEBIN   = -2; // hash for roots of trees
static final int RESERVED  = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
//CPU个数
static final int NCPU = Runtime.getRuntime().availableProcessors();
//   上面的都是静态常量,下面的是成员变量 均使用了volatile 修饰---------------------
 //常见的哈希桶,当第一次插入数据时进行初始化,大小为2的N次幂
transient volatile Node<K,V>[] table;
 //扩容后的数组,其实就是HashMap扩容过程中的newTab
 //扩容结束后,该数组被置为null
 //在transfer中被初始化
private transient volatile Node<K,V>[] nextTable;
 //基本计数器值,主要在没有争用时使用,但在表初始化竞争期间也用作后备。通过CAS更新。
private transient volatile long baseCount;
// table初始化或者扩容控制 。
//如果为负,则表正在初始化或扩容:
//* -1表示初始化
//* 或者就是-(1+活跃的扩容现场个数)
//否则,当table为null,将保留创建时使用的初始表大小,或默认为0
//初始化后,保存下一个要调整表大小的元素计数值--也就是阈值/临界值。
private transient volatile int sizeCtl;
 // 扩容时,下一个table index(plus one)
private transient volatile int transferIndex;
 // 自旋锁(通过CAS锁定) 当扩容或者创建CounterCells使用
private transient volatile int cellsBusy;
 //非空时,大小为2的N次幂
private transient volatile CounterCell[] counterCells;
// views
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;


② 核心数据结构Node

与jdk1.7的ConcurrentHashMap不同的时,这里将HashEntry换成了Node。以便树化的时候采用TreeNode结点。Node的成员与HashEntry一致,都是hash、key、val、next。并且val和next使用了volatile 修饰。

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
 //...       
}


其子类TreeNode用来表示红黑树中的树结点,其与HashMap中树结点结构是一致的。

static final class TreeNode<K,V> extends Node<K,V> {
       TreeNode<K,V> parent;  // red-black tree links
       TreeNode<K,V> left;
       TreeNode<K,V> right;
       TreeNode<K,V> prev;    // needed to unlink next upon deletion
       boolean red;
       TreeNode(int hash, K key, V val, Node<K,V> next,
                TreeNode<K,V> parent) {
           super(hash, key, val, next);
           this.parent = parent;
       }
       //...
}       


这里TreeNode与HashMap不同的是,它并不是直接转换为红黑树,而是把这些节点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装。

③ 核心数据结构TreeBin

TreeBin也继承自Node,与TreeNode不同的是TreeBin在头部结点被使用(个人理解就是(n-1)&h)。其并没有持有key、value而是指向了TreeNodes和root结点。


TreeNodes used at the heads of bins. TreeBins do not hold user keys or values, but instead point to list of TreeNodes and their root. They also maintain a parasitic read-write lock forcing writers (who hold bin lock) to wait for readers (who do not) to complete before tree restructuring operations.

static final class TreeBin<K,V> extends Node<K,V> {
   TreeNode<K,V> root;
   volatile TreeNode<K,V> first;
   volatile Thread waiter;
   volatile int lockState;
   // values for lockState
   static final int WRITER = 1; // set while holding write lock
   static final int WAITER = 2; // set when waiting for write lock
   static final int READER = 4; // increment value for setting read lock
   //...
}   

这个类并不负责包装用户的key、value值,而是包装很多TreeNode节点,他代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap数组中,存放的是 TreeBin对象,而不是TreeNode对象。 TreeBin从字面含义中可以理解为存储树形结构的容器,而树形结构就是指TreeNode,所以 TreeBin就是封装TreeNode的容器,他提供转换红黑树的一些条件和锁的控制。


④ 构造函数

ConcurrentHashMap提供了一系列重载的构造函数:

//无参构造函数
public ConcurrentHashMap() {
}
//指定初始化容量的构造函数
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               // 1.5*initialCapacity  + 1
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}
//给定Map进行初始化 sizeCtl 默认为16
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this.sizeCtl = DEFAULT_CAPACITY;
    putAll(m);
}
//给定初始化容量和负载因子
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
  //  1 指的是并发级别
    this(initialCapacity, loadFactor, 1);
}
//给定初始化容量、负载因子以及并发级别
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
     //table.length至少是concurrencyLevel
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    //预估tab大小  类似于   initialCapacity + (initialCapacity >>> 1) + 1)
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    // cap要么是MAXIMUM_CAPACITY ,要么是size的最近一个值---该值是2的N次幂
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    // cap赋予    sizeCtl ,默认是tab初始化大小--此时tab[]还没有真正实例化
    this.sizeCtl = cap;
}

⑤ 关于结点hash说明

前面我们提到了几个常量,如下所示:

//结点哈希字段的编码
static final int MOVED     = -1; // hash for forwarding nodes
static final int TREEBIN   = -2; // hash for roots of trees
static final int RESERVED  = -3; // hash for transient reservations
// 换成二进制则是: 0111 然后后面是7个 1111 也就是int的最大值 高位0表示符号位
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

其中MOVED应用在ForwardingNode<K,V> extends Node<K,V>中,其是对Node的扩展用于解决当进行扩容的时候,进行查询的问题。

ForwardingNode(Node<K,V>[] tab) {
  // 从这里可以看到,其 hash 为 -1
    super(MOVED, null, null, null);
    this.nextTable = tab;
}

ForwardingNode 的 key、value 和 next 都为 null, hash 为常量 MOVED。当迁移线程完成“一个桶”的全部元素的迁移后, 旧数组中该桶所在的位置会被赋成一个ForwardingNode。这样在迁移过程中, 如果有线程需要访问已经被迁移到 nextTable 的元素时, 可以通过 ForwadingNode 找到 nextTable。


TREEBIN 应用在TreeBin<K,V> extends Node<K,V>,用来表示tab[index]位置为红黑树时的头结点。

TreeBin(TreeNode<K,V> b) {
  // 从这里可以看出其 hash 为-2
    super(TREEBIN, null, null, null);
    //...
}    

RESERVED 应用在ReservationNode<K,V> extends Node<K,V>,用于解决当进行计算时,计算的对象为空的问题。

ReservationNode() {
  // 从这里可以看到,其 hash 为 -3
    super(RESERVED, null, null, null);
}

计算key的散列值方法如下:

// h 右移16位,然后与自身进行 &运算,最后再与HASH_BITS 进行 & 运算
  //h为key值得hash值,将高16位也参与运算,然后与int最大值进行&运算(效果为将值变为正数,其他位置不变)
//HASH_BITS为int最大值,最高位为0
//HashMap中没有处理为正数的步骤,这里负数有其它含义,查看节点类型
static final int spread(int h) {
   return (h ^ (h >>> 16)) & HASH_BITS;
}

从这里也可以看到出链表的结点hash是大于0的。

总结索引位置tab[(n-1)&hash]结点的hash值含义:

  • 链表:大于0
  • ForwardingNode:-1
  • 红黑树:-2
  • ReservationNode:-3


⑥ 关于SIZECTL & sizeCtl

sizeCtl是控制标识符,不同的值表示不同的意义:


负数代表正在进行初始化或者扩容操作,其中-1表示正在初始化,-N表示有N-1个线程正在进行扩容操作。

-N这个解释其实是不对的,实际上数组如果正在扩容, sizeCtl 的值是 resizeStamp(n) + 1+ 处于活跃状态的扩容线程数量,这个下面几个场景可以看出来。

整数或者0表示tab还没有被初始化,这个数值表示初始化或者下一次扩容的大小,类似于扩容阈值。它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的,实际容量>=sizeCtl,则扩容。


sizeCtl初始化的场景

如下图所示,在ConcurrentHashMap初始化的时候也就是ConcurrentHashMap的构造函数中。


sizeCtl被赋值的场景

// 初始化开始时先赋值为 -1
U.compareAndSwapInt(this, SIZECTL, sc, -1)

如在初始化或扩容完成后,调整为0.75n:

扩容前后sizeCtl的更新

// 第一个线程扩容,更新 SIZECTL 为 (rs << RESIZE_STAMP_SHIFT) + 2
// 这里为啥不是 +1 ? 不理解
U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)
// 第N个线程扩容,更新 SIZECTL 为 sc + 1
U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)
// 当前线程扩容结束,更新 SIZECTL 为 sc-1
U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)

从这个扩容前后值的更新也能看出来,-N表示有N-1个线程正在进行扩容操作。


我们顺带研究一下(rs << RESIZE_STAMP_SHIFT) + 2,rs就是扩容戳,其是一个比较大的正数,从右往左数,第16位一定是1。那么rs << RESIZE_STAMP_SHIFT后是一个很大的负值, 其低 16 位的值都是 0。


所以 1+ 处于活跃状态的扩容线程数 与 rs << RESIZE_STAMP_SHIFT加和之后, 1+ 处于活跃状态的扩容线程数 的二进制形式都会保存在低 16位

【2】核心方法get

① get

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 根据key的hashcode得到hash值
    int h = spread(key.hashCode());
    // 通过(n - 1) & h 来定位索引位置,也就是位于tab的哪个index
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 判断索引位置的 e 是否满足: hash相等且key也相等
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // eh < 0 一定不是链表--红黑树遍历,Node e 可能真实为TreeNode
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        //链表遍历    
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

流程梳理如下:

① 根据key的hashcode得到hash值,通过 (n - 1) & h 来定位索引位置,也就是位于tab的哪个index;

② e = tabAt(tab, (n - 1) & h),判断索引位置的 e 是否满足: hash相等且key也相等

③ 如果②不满足且结点是红黑树,那么进行树的查找;

④ 如果③不满足,则进行链表的查找

这里需要注意的是在进行定位、更新时均采用了sun.misc.Unsafe来保证volatile语义,其中casTabAt方法采用了CAS思想。代码如下所示:

// get put都是用了volatile
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
   return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
   U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
//采用CAS思想进行值的更新
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                   Node<K,V> c, Node<K,V> v) {
   return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

② 树的查找

树节点的查找本质与前面我们分析HashMap中getTreeNode流程一致,都是hash进行对比判断左右。

Node<K,V> find(int h, Object k) {
    return findTreeNode(h, k, null);
}
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
       if (k != null) {
           TreeNode<K,V> p = this;
           do  {
               int ph, dir; K pk; TreeNode<K,V> q;
               TreeNode<K,V> pl = p.left, pr = p.right;
               if ((ph = p.hash) > h)
                   p = pl;
               else if (ph < h)
                   p = pr;
               else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
                   return p;
               else if (pl == null)
                   p = pr;
               else if (pr == null)
                   p = pl;
               else if ((kc != null ||
                         (kc = comparableClassFor(k)) != null) &&
                        (dir = compareComparables(kc, k, pk)) != 0)
                   p = (dir < 0) ? pl : pr;
               else if ((q = pr.findTreeNode(h, k, kc)) != null)
                   return q;
               else
                   p = pl;
           } while (p != null);
       }
       return null;
   }
}

comparableClassFor: 如果对象x的类是C,如果C实现了Comparable<C>接口,那么返回C,否则返回null。这个方法的作用就是查看对象x是否实现了Comparable接口


compareComparables: 如果x的类型是kc,返回k.compareTo(x)的比较结果。如果x为空,或者类型不是kc,返回0


pl 是p的左结点,pr是p的右结点。ph是p的hash,pk是p的k。h是目标key的hash(key)结果。kc是目标key的Class类型。

【3】核心方法put

① put

public V put(K key, V value) {
    return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 可以发现ConcurrentHashMap是不允许key value为null的
    if (key == null || value == null) throw new NullPointerException();
    // 获取hash值 (h ^ (h >>> 16)) & HASH_BITS
    int hash = spread(key.hashCode());
    int binCount = 0;
    //循环,也有自旋的含义
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //首先判断是否需要初始化tab,初始tab会采用compareAndSwapInt
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
    //如果目标位置没有元素,则采用CAS思想放到目标位置
    //f = tabAt(tab, i = (n - 1) & hash) 表示索引目标位置的结点
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // 如果正在扩容...也就是 f 为ForwardingNode类型
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 加锁synchronized 注意锁对象是f , 而不是tab[]
            synchronized (f) {
            //  必须确保 f = tabAt(tab, i = (n - 1) & hash)
                if (tabAt(tab, i) == f) {
                  // 说明是链表,尾插法
                    if (fh >= 0) {
                        binCount = 1;
                   //进行遍历,判断key是否已经存在 binCount用来记录遍历的链表结点个数
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 如果key不存在,则尾插入结点
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 如果是树,则触发putTreeVal向树中放入
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // 判断是否需要将链表转为红黑树
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                //树化,这里i = (n - 1) & hash),也就是槽位
                //需要说明的是,这里树化与HashMap不同,其交给了TreeBin来构造
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //统计总个数,这个里面可能会引起扩容
    addCount(1L, binCount);
    return null;
}

put方法简单梳理如下:

根据 key 的hashcode 计算出哈希值hash用于定位在tab[]的索引位置


判断tab是否需要进行初始化。


f = tabAt(tab, i = (n - 1) & hash) 表示索引目标位置的结点即为当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功。


如果当前位置的 hashcode == MOVED == -1,则需要进行扩容。当前线程也帮着进行扩容


如果都不满足,则利用 synchronized 锁写入数据。 加锁synchronized 注意锁对象是f即索引位置或者称之为“槽位”结点 , 而不是整个tab[] 或者jdk1.7中的Segment。


如果 binCount >= TREEIFY_THRESHOLD 则要转换为红黑树。


addCount方法记录元素量并可能进行扩容


binCount 的变量作用是什么?

  • 在操作链表的分支if (fh >= 0)中 用于统计链表长度
  • 在if (f instanceof TreeBin) 分支中看到, binCount=2 , 该值被直接赋值常量2


此外上面方法触发了如下几个关键方法:

  • initTable()
  • helpTransfer(tab, f);
  • putTreeVal(hash, key,value)
  • treeifyBin(tab, i)
  • addCount(1L, binCount);

我们接下来再逐个分析研究。


② tab的初始化


sizeCtl是表的初始容量,默认是16,也可以通过构造函数指定。


这个方法也是线程安全的,它没有加锁,但是通过sizeCtl的值来控制,初始化前修改sizeCtl为-1,初始化完成后再把sizeCtl的值修改为n - (n >>> 2)也就是0.75*n(负载因子默认就是0.75哦),比如默认初始化tab.length为16,那么这里最后sizeCtl就是12,也就是“阈值”。

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
    // 说明正在扩容或者初始化
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
         // 先修改为 -1 说明处于初始化   
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    //初始化数组
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    // sc修改为0.75n
                    sc = n - (n >>> 2);
                }
            } finally {
              //修改sizeCtl
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

③ helpTransfer

在 putVal 方法中,如果发现线程当前 hash 冲突了,也就是当前 hash 值对应的槽位(tab[i]这个索引位置)有值了,且如果这个值是 -1 (MOVED),说明 Map 正在扩容。那么就帮助 Map 进行扩容。以加快速度。


如果当前map正在扩容,那么Node<K,V> f结点一定是ForwardingNode。关于这一点可以在transfer方法里面看到。

// 如果发现有线程对链表或者树进行扩容,那么当前线程也需要帮助进行扩容。
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    // 判断 f 是ForwardingNode类型,并获取nextTab,nextTable 一定不为null
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
    //获取扩容戳 假设 16 -> 32 扩容戳:1000 0000 0001 1011
        int rs = resizeStamp(tab.length);
    //(sc = sizeCtl) < 0表示正在初始化或者扩容
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            //属于如下四种情况,直接break
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
             // 将 sizeCtl + 1, (表示增加了一个线程帮助其扩容)
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
              //实际扩容方法
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}


while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0)是用来校验nextTable与table 没有被修改, (sc = sizeCtl) < 0)是用来校验正在扩容。

RESIZE_STAMP_BITS 默认是16
//扩容戳移位,默认16
RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS
// 65535
MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1


if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0)用来判断四种情况:


如果(sc >>> RESIZE_STAMP_SHIFT) != rs ,说明当前线程获取到的扩容唯一标识戳 非 本批次扩容(这个不理解,这两个玩意左右恒不相等!个人理解是jdk想表达的意思)

或者 sc == rs + 1 (扩容结束了,不再有线程进行扩容。默认第一个线程设置 sc ==rs <<16 位 + 2。当第一个线程结束扩容了,就会将 sc 减一,这个时候,sc =(rs<<16)+ 1;

或者 sc == rs + 65535 (如果达到最大帮助线程的数量就结束,应该是(rs << 16 )+65535)

或者transferIndex <= 0,transferIndex 在transfer方法中会被赋值,记录nextIndex,小于0说明map对象全局范围内的任务已经分配完了,当前线程进去也没活干

其实这里感觉有点问题,假设第一个线程扩容时将SIZECTL修改为了(rs << RESIZE_STAMP_SHIFT) + 2,扩容结束修改为了(rs << RESIZE_STAMP_SHIFT) + 1。第二个线程使用sc == rs + 1进行判断,这个判断应该恒为false,正确的表达式应该是sc == ( rs << RESIZE_STAMP_SHIFT) + 1。


同样的道理,(sc >>> RESIZE_STAMP_SHIFT) != rs什么时候会为true?


这是个人理解,如有误,烦请指出,不胜感激。[补充说明]上面这个方法确实是有问题的,下面是openjdk12-ConcurrentHashMap.java中helpTransfer方法。

/**
 * Helps transfer if a resize is in progress.
 */
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        // rs = resizeStamp(tab.length) << 16
        int rs = resizeStamp(tab.length) << RESIZE_STAMP_SHIFT;
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
             // 只有三个判断条件!!! 
            if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
                transferIndex <= 0)
                break;
            if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) {
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

可以看到在jdk12中首先对 扩容戳进行了移位再得到rs,int rs = resizeStamp(tab.length) << RESIZE_STAMP_SHIFT;,然后进行了三种情况的判断:


sc == rs + MAX_RESIZERS,帮助线程是否达到最大数量

sc == rs + 1,扩容是否结束

transferIndex <= 0,是否无需帮助进行扩容

④ resizeStamp生成扩容戳

这个方法是用来生成扩容戳,假设n是16,那么得到的结果是:32795。这个方法可以保证每次扩容都生成唯一的生成戳。每次新的扩容,都有一个不同的n,这个生成戳就是根据n来计算出来的一个数字,n不同,这个数字也不同。

// 左移 RESIZE_STAMP_SHIFT 位后, 要变成一个负值
static final int resizeStamp(int n) { 
// RESIZE_STAMP_BITS 默认是16
// (1 << (RESIZE_STAMP_BITS - 1) 也就是 1 左移 15位 =32768
  return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); 
}


Integer.numberOfLeadingZeros(n) 这个方法是返回无符号整数n最高位非0位前面的0的个数


比如10的二进制是 0000 0000 0000 0000 0000 0000 0000 1010


那么这个方法返回的值就是28 。


计算扩容戳的后半截计算逻辑是, 把 1 左移 (RESIZE_STAMP_BITS-1) (其值为 15) 位后, 与 numberOfLeadingZeros(n) 做或操作。 这样保证 resizeStamp 返回的 32 bit 结果的第 16 位一定是一个 1, 这样就保证了把这个 1 再左移 RESIZE_STAMP_SHIFT(其值为 16)后, resizeStamp 的第 32 bit , 是一个 1 , int 变量的最高位是符号位, 所以它就变成了负值。

rs<<RESIZE_STAMP_SHIFT 表示了什么?


rs是个高16位均为0,低16位记录了Integer.numberOfLeadingZeros(n)的数。左移RESIZE_STAMP_SHIFT也就是左移16后,其高16位也就是符号位1+Integer.numberOfLeadingZeros(n)。


其更新sizeCtl,最大值也就是rs<<RESIZE_STAMP_SHIFT+65535。也就是说rs<<RESIZE_STAMP_SHIFT的低16位即可存放扩容线程数量。


从这里我们也可以得出两个结论:

  • sizeCtl 为负值时,高16位bit反映了正在进行的扩容操作是针对哪个容量进行的
  • sizeCtl 为负值时, 低 16位bit 反映了参与此次扩容的线程有多少个


resizeStamp 为什么要满足左移RESIZE_STAMP_SHIFT 位 成为负值的性质 ?

一个线程首次进入 transfer 之前会将 sizeCtl 通过 CAS 方式更新为(rs<<RESIZE_STAMP_SHIFT) + 2, sizeCtl 是负值的时候表示当前数组正在扩容。


当扩容线程数量比较多的时候, 会导致sizeCtl符号位变化吗 ?

前面我们提到过当前进行扩容时,要么更新sizeCtl=(rs<<16)+2,要么更新sizeCtl=sc+1,这个过程要保证sizeCtl为负值。那么如果扩容线程数量很多时呢?会导致高位符号位变化吗?


答案是不会。


因为最大活跃线程数被另外一个常量 MAX_RESIZERS = 65535 控制, 65535 的 int 二进制形式是低 15 位全是 1。(resizeStamp<<16后) 的低16位完全可以存放最大活跃线程数。

在上面代码我们也可以看到当sc == rs<<16 + MAX_RESIZERS时,直接就break了,不会触发扩容。

因为最大活跃线程数被另外一个常量 MAX_RESIZERS = 65535 控制, 65535 的 int 二进制形式是低 15 位全是 1。(resizeStamp<<16后) 的低16位完全可以存放最大活跃线程数。

在上面代码我们也可以看到当sc == rs<<16 + MAX_RESIZERS时,直接就break了,不会触发扩容。

⑤ putTreeVal

如果研究过HashMap的树节点添加,那么下面这个方法还是很好理解的:

  • 如果从左右分支找到了目标结点,那么直接返回;
  • 如果找不到,那么就是新结点,首先要确定其parent;
  • 然后根据dir的值,放到左右子节点
  • 加锁,进行平衡插入,释放锁

整个过程和HashMap的不同之处就在于多了lockRoot();unlockRoot();

final TreeNode<K,V> putTreeVal(int h, K k, V v) {
    Class<?> kc = null;
    // 记录左右子树是否find过
    boolean searched = false;
    //获取到root结点,然后无限循环直到return或者break
    for (TreeNode<K,V> p = root;;) {
        int dir, ph; K pk;
        //如果p为null,那么直接作为根节点,返回
        if (p == null) {
            first = root = new TreeNode<K,V>(h, k, v, null, null);
            break;
        }
        // 判断是p的左分支还是右分支
        else if ((ph = p.hash) > h)
            dir = -1;
        else if (ph < h)
            dir = 1;
        // 如果 p 就是目标结点,直接返回    
        else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
            return p;
         // 尝试使用key的compareTo方法进行比较   
        else if ((kc == null &&
                  (kc = comparableClassFor(k)) == null) ||
                 (dir = compareComparables(kc, k, pk)) == 0) {
             //如果没有得到期望的结果(确定左右分支),且没有遍历过,
             // 那么就先进行左子树遍历后进行右子树遍历
            if (!searched) {
                TreeNode<K,V> q, ch;
                searched = true;
                if (((ch = p.left) != null &&
                     (q = ch.findTreeNode(h, k, kc)) != null) ||
                    ((ch = p.right) != null &&
                     (q = ch.findTreeNode(h, k, kc)) != null))
                    return q;
            }
            // 必须得到dir为1或者-1
            dir = tieBreakOrder(k, pk);
        }
        TreeNode<K,V> xp = p;
        // 如果dir < 0且p.left为null,那么就放在左分支;
        //如果dir > 0且p.right为null,那么就放在右分支;
        if ((p = (dir <= 0) ? p.left : p.right) == null) {
            TreeNode<K,V> x, f = first;
      //TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent)
            first = x = new TreeNode<K,V>(h, k, v, f, xp);
            if (f != null)
                f.prev = x;
            if (dir <= 0)
                xp.left = x;
            else
                xp.right = x;
            //如果父节点是黑色,那么插入的子节点为红色    
            if (!xp.red)
                x.red = true;
            else {
            // 加锁 CAS+自旋
                lockRoot();
                try {
                // 平衡插入
                    root = balanceInsertion(root, x);
                } finally {
                // 解锁
                    unlockRoot();
                }
            }
            break;
        }
    }
    assert checkInvariants(root);
    return null;
}


⑥ treeifyBin

如果table.length<64,那么触发tryPresize尝试进行扩容,否则就把所有链表结点替换为树结点最后形成一棵红黑树。方法入参中的index指的是索引位置(槽位)。

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
    // 判断是否需要扩容
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
         //b 是索引位置结点,b.hash >= 0说明是链表结点
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
        // 对槽位进行加锁
            synchronized (b) {
            // 再次检测,免得这个这时候槽位结点已经发生改变
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    // 从索引位置结点开始,一路next,将所有节点都替换为TreeNode
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    // 将new TreeBin<K,V>(hd)放到索引值
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}


这个方法最核心的地方就在于new TreeBin<K,V>(hd),这个对象实例化时构造函数会触发红黑树的构造。这个在博文认真学习jdk1.8下ConcurrentHashMap的扩容机制进行了阐述。


1.8 在 1.7 的数据结构上做了大的改动,采用红黑树之后可以保证查询效率(O(logn)),甚至取消了 ReentrantLock 改为了 synchronized,这样可以看出在新版的 JDK 中对 synchronized 优化是很好的。


可能会有疑问synchronized 是悲观锁、重量级锁,ReentrantLock 是显示锁,为什么使用了synchronized 呢?


这是因为JDK对synchronized 锁进行了优化,在jdk1.6对锁的实现引入了大量的优化。如自旋锁、适应性自旋锁、锁消除、锁粗化、偏向锁、轻量级锁等技术来减少锁操作的开销。所以,不要再认为synchronized 效率就一定比ReentrantLock 低,synchronized 现在效率也不低了。

目录
相关文章
|
7月前
|
存储 安全 Java
【JDK 源码分析】ConcurrentHashMap 底层结构
【1月更文挑战第27天】【JDK 源码分析】ConcurrentHashMap 底层结构
|
7月前
|
Oracle Java 编译器
基本概念【入门、 发展简史、核心优势、各版本的含义、特性和优势、JVM、JRE 和 JDK 】(二)-全面详解(学习总结---从入门到深化)
基本概念【入门、 发展简史、核心优势、各版本的含义、特性和优势、JVM、JRE 和 JDK 】(二)-全面详解(学习总结---从入门到深化)
95 1
|
4月前
|
算法 安全 Java
深入JDK源码:揭开ConcurrentHashMap底层结构的神秘面纱
【8月更文挑战第24天】`ConcurrentHashMap`是Java并发编程中不可或缺的线程安全哈希表实现。它通过精巧的锁机制和无锁算法显著提升了并发性能。本文首先介绍了早期版本中使用的“段”结构,每个段是一个带有独立锁的小型哈希表,能够减少线程间竞争并支持动态扩容以应对高并发场景。随后探讨了JDK 8的重大改进:取消段的概念,采用更细粒度的锁控制,并引入`Node`等内部类以及CAS操作,有效解决了哈希冲突并实现了高性能的并发访问。这些设计使得`ConcurrentHashMap`成为构建高效多线程应用的强大工具。
58 2
|
7月前
|
算法 Java 索引
【数据结构与算法】4、双向链表(学习 jdk 的 LinkedList 部分源码)
【数据结构与算法】4、双向链表(学习 jdk 的 LinkedList 部分源码)
74 0
|
7月前
|
Oracle IDE Java
基本概念【入门、 发展简史、核心优势、各版本的含义、特性和优势、JVM、JRE 和 JDK 】(二)-全面详解(学习总结---从入门到深化)(下)
基本概念【入门、 发展简史、核心优势、各版本的含义、特性和优势、JVM、JRE 和 JDK 】(二)-全面详解(学习总结---从入门到深化)
69 1
|
7月前
|
安全 Java 索引
认真学习jdk1.7下ConcurrentHashMap的实现原理
认真学习jdk1.7下ConcurrentHashMap的实现原理
120 0
|
7月前
|
机器学习/深度学习 存储 Java
认真学习jdk1.8下ConcurrentHashMap的扩容机制
认真学习jdk1.8下ConcurrentHashMap的扩容机制
145 0
|
3月前
|
Java
安装JDK18没有JRE环境的解决办法
安装JDK18没有JRE环境的解决办法
378 3
|
4月前
|
Oracle Java 关系型数据库
Mac安装JDK1.8
Mac安装JDK1.8
769 4
|
4月前
|
Java 关系型数据库 MySQL
"解锁Java Web传奇之旅:从JDK1.8到Tomcat,再到MariaDB,一场跨越数据库的冒险安装盛宴,挑战你的技术极限!"
【8月更文挑战第19天】在Linux上搭建Java Web应用环境,需安装JDK 1.8、Tomcat及MariaDB。本指南详述了使用apt-get安装OpenJDK 1.8的方法,并验证其版本。接着下载与解压Tomcat至`/usr/local/`目录,并启动服务。最后,通过apt-get安装MariaDB,设置基本安全配置。完成这些步骤后,即可验证各组件的状态,为部署Java Web应用打下基础。
64 1