1.7 已经解决了并发问题,并且能支持 N 个 Segment 这么多次数的并发,但依然存在 HashMap 在 1.7 版本中的问题—查询、遍历链表效率太低。jdk1.8 做了一些数据结构上的调整,先来看下底层的组成结构(其实和jdk1.8下HashMap的数据结构一致,就是数组+链表+红黑树):
其抛弃了原有的 Segment 分段锁(ReentrantLock),在put时采用了 CAS + synchronized 来保证并发安全性。
也将 1.7 中存放数据的 HashEntry 改为 Node,但作用都是相同的,其中的 val , next 都用了 volatile 修饰,保证了内存可见性。
下文中的索引位置就是tab[(n-1)&hash]结点,也可以称之为“槽位”。
【1】核心属性和构造
① 核心属性
// 最大容量--key-value键值对的最大个数 32bit的最高两位用于控制目的 private static final int MAXIMUM_CAPACITY = 1 << 30; //哈希桶-table默认初始化大小 private static final int DEFAULT_CAPACITY = 16; //最大可能数组size(non-power of two),toArray 和关联方法使用 static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //默认并发级别,定义但没有被使用--为前个版本备留 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 负载因子,默认和HashMap一样 0.75 private static final float LOAD_FACTOR = 0.75f; // 链表树化的临界值 如果链表结点个数大于等于8,且tab.length>=64 就转为树 static final int TREEIFY_THRESHOLD = 8; /** * The bin count threshold for untreeifying a (split) bin during a * resize operation. Should be less than TREEIFY_THRESHOLD, and at * most 6 to mesh with shrinkage detection under removal. */ // 树转为链表的临界值,结点个数小于等于6,就转为链表 static final int UNTREEIFY_THRESHOLD = 6; // 树化的一个前提条件-数组.length>=64,否则将扩容而不是树化 static final int MIN_TREEIFY_CAPACITY = 64; /** * Minimum number of rebinnings per transfer step. Ranges are * subdivided to allow multiple resizer threads. This value * serves as a lower bound to avoid resizers encountering * excessive memory contention. The value should be at least * DEFAULT_CAPACITY. */ private static final int MIN_TRANSFER_STRIDE = 16; //sizeCtl中用于生成戳的位数。对于32位阵列,必须至少为6。 private static int RESIZE_STAMP_BITS = 16; //可以帮助resize的最大线程数。必须适合32位-RESIZE_STAMP_BITS位。 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; //在sizeCtl中记录大小戳(size stamp)的偏移位。 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; //结点哈希字段的编码 static final int MOVED = -1; // hash for forwarding nodes static final int TREEBIN = -2; // hash for roots of trees static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash //CPU个数 static final int NCPU = Runtime.getRuntime().availableProcessors(); // 上面的都是静态常量,下面的是成员变量 均使用了volatile 修饰--------------------- //常见的哈希桶,当第一次插入数据时进行初始化,大小为2的N次幂 transient volatile Node<K,V>[] table; //扩容后的数组,其实就是HashMap扩容过程中的newTab //扩容结束后,该数组被置为null //在transfer中被初始化 private transient volatile Node<K,V>[] nextTable; //基本计数器值,主要在没有争用时使用,但在表初始化竞争期间也用作后备。通过CAS更新。 private transient volatile long baseCount; // table初始化或者扩容控制 。 //如果为负,则表正在初始化或扩容: //* -1表示初始化 //* 或者就是-(1+活跃的扩容现场个数) //否则,当table为null,将保留创建时使用的初始表大小,或默认为0 //初始化后,保存下一个要调整表大小的元素计数值--也就是阈值/临界值。 private transient volatile int sizeCtl; // 扩容时,下一个table index(plus one) private transient volatile int transferIndex; // 自旋锁(通过CAS锁定) 当扩容或者创建CounterCells使用 private transient volatile int cellsBusy; //非空时,大小为2的N次幂 private transient volatile CounterCell[] counterCells; // views private transient KeySetView<K,V> keySet; private transient ValuesView<K,V> values; private transient EntrySetView<K,V> entrySet;
② 核心数据结构Node
与jdk1.7的ConcurrentHashMap不同的时,这里将HashEntry换成了Node。以便树化的时候采用TreeNode结点。Node的成员与HashEntry一致,都是hash、key、val、next。并且val和next使用了volatile 修饰。
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } //... }
其子类TreeNode用来表示红黑树中的树结点,其与HashMap中树结点结构是一致的。
static final class TreeNode<K,V> extends Node<K,V> { TreeNode<K,V> parent; // red-black tree links TreeNode<K,V> left; TreeNode<K,V> right; TreeNode<K,V> prev; // needed to unlink next upon deletion boolean red; TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) { super(hash, key, val, next); this.parent = parent; } //... }
这里TreeNode与HashMap不同的是,它并不是直接转换为红黑树,而是把这些节点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装。
③ 核心数据结构TreeBin
TreeBin也继承自Node,与TreeNode不同的是TreeBin在头部结点被使用(个人理解就是(n-1)&h)。其并没有持有key、value而是指向了TreeNodes和root结点。
TreeNodes used at the heads of bins. TreeBins do not hold user keys or values, but instead point to list of TreeNodes and their root. They also maintain a parasitic read-write lock forcing writers (who hold bin lock) to wait for readers (who do not) to complete before tree restructuring operations.
static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root; volatile TreeNode<K,V> first; volatile Thread waiter; volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock //... }
这个类并不负责包装用户的key、value值,而是包装很多TreeNode节点,他代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap数组中,存放的是 TreeBin对象,而不是TreeNode对象。 TreeBin从字面含义中可以理解为存储树形结构的容器,而树形结构就是指TreeNode,所以 TreeBin就是封装TreeNode的容器,他提供转换红黑树的一些条件和锁的控制。
④ 构造函数
ConcurrentHashMap提供了一系列重载的构造函数:
//无参构造函数 public ConcurrentHashMap() { } //指定初始化容量的构造函数 public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : // 1.5*initialCapacity + 1 tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; } //给定Map进行初始化 sizeCtl 默认为16 public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); } //给定初始化容量和负载因子 public ConcurrentHashMap(int initialCapacity, float loadFactor) { // 1 指的是并发级别 this(initialCapacity, loadFactor, 1); } //给定初始化容量、负载因子以及并发级别 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); //table.length至少是concurrencyLevel if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads //预估tab大小 类似于 initialCapacity + (initialCapacity >>> 1) + 1) long size = (long)(1.0 + (long)initialCapacity / loadFactor); // cap要么是MAXIMUM_CAPACITY ,要么是size的最近一个值---该值是2的N次幂 int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); // cap赋予 sizeCtl ,默认是tab初始化大小--此时tab[]还没有真正实例化 this.sizeCtl = cap; }
⑤ 关于结点hash说明
前面我们提到了几个常量,如下所示:
//结点哈希字段的编码 static final int MOVED = -1; // hash for forwarding nodes static final int TREEBIN = -2; // hash for roots of trees static final int RESERVED = -3; // hash for transient reservations // 换成二进制则是: 0111 然后后面是7个 1111 也就是int的最大值 高位0表示符号位 static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
其中MOVED应用在ForwardingNode<K,V> extends Node<K,V>
中,其是对Node的扩展用于解决当进行扩容的时候,进行查询的问题。
ForwardingNode(Node<K,V>[] tab) { // 从这里可以看到,其 hash 为 -1 super(MOVED, null, null, null); this.nextTable = tab; }
ForwardingNode 的 key、value 和 next 都为 null, hash 为常量 MOVED。当迁移线程完成“一个桶”的全部元素的迁移后, 旧数组中该桶所在的位置会被赋成一个ForwardingNode。这样在迁移过程中, 如果有线程需要访问已经被迁移到 nextTable 的元素时, 可以通过 ForwadingNode 找到 nextTable。
TREEBIN 应用在TreeBin<K,V> extends Node<K,V>,用来表示tab[index]位置为红黑树时的头结点。
TreeBin(TreeNode<K,V> b) { // 从这里可以看出其 hash 为-2 super(TREEBIN, null, null, null); //... }
RESERVED 应用在ReservationNode<K,V> extends Node<K,V>
,用于解决当进行计算时,计算的对象为空的问题。
ReservationNode() { // 从这里可以看到,其 hash 为 -3 super(RESERVED, null, null, null); }
计算key的散列值方法如下:
// h 右移16位,然后与自身进行 &运算,最后再与HASH_BITS 进行 & 运算 //h为key值得hash值,将高16位也参与运算,然后与int最大值进行&运算(效果为将值变为正数,其他位置不变) //HASH_BITS为int最大值,最高位为0 //HashMap中没有处理为正数的步骤,这里负数有其它含义,查看节点类型 static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
从这里也可以看到出链表的结点hash是大于0的。
总结索引位置tab[(n-1)&hash]
结点的hash值含义:
- 链表:大于0
- ForwardingNode:-1
- 红黑树:-2
- ReservationNode:-3
⑥ 关于SIZECTL & sizeCtl
sizeCtl是控制标识符,不同的值表示不同的意义:
负数代表正在进行初始化或者扩容操作,其中-1表示正在初始化,-N表示有N-1个线程正在进行扩容操作。
-N这个解释其实是不对的,实际上数组如果正在扩容, sizeCtl 的值是 resizeStamp(n) + 1+ 处于活跃状态的扩容线程数量,这个下面几个场景可以看出来。
整数或者0表示tab还没有被初始化,这个数值表示初始化或者下一次扩容的大小,类似于扩容阈值。它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的,实际容量>=sizeCtl,则扩容。
sizeCtl初始化的场景
如下图所示,在ConcurrentHashMap初始化的时候也就是ConcurrentHashMap的构造函数中。
sizeCtl被赋值的场景
// 初始化开始时先赋值为 -1 U.compareAndSwapInt(this, SIZECTL, sc, -1)
如在初始化或扩容完成后,调整为0.75n:
扩容前后sizeCtl的更新
// 第一个线程扩容,更新 SIZECTL 为 (rs << RESIZE_STAMP_SHIFT) + 2 // 这里为啥不是 +1 ? 不理解 U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2) // 第N个线程扩容,更新 SIZECTL 为 sc + 1 U.compareAndSwapInt(this, SIZECTL, sc, sc + 1) // 当前线程扩容结束,更新 SIZECTL 为 sc-1 U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)
从这个扩容前后值的更新也能看出来,-N表示有N-1个线程正在进行扩容操作。
我们顺带研究一下(rs << RESIZE_STAMP_SHIFT) + 2,rs就是扩容戳,其是一个比较大的正数,从右往左数,第16位一定是1。那么rs << RESIZE_STAMP_SHIFT后是一个很大的负值, 其低 16 位的值都是 0。
所以 1+ 处于活跃状态的扩容线程数 与 rs << RESIZE_STAMP_SHIFT加和之后, 1+ 处于活跃状态的扩容线程数 的二进制形式都会保存在低 16位
【2】核心方法get
① get
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; // 根据key的hashcode得到hash值 int h = spread(key.hashCode()); // 通过(n - 1) & h 来定位索引位置,也就是位于tab的哪个index if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 判断索引位置的 e 是否满足: hash相等且key也相等 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // eh < 0 一定不是链表--红黑树遍历,Node e 可能真实为TreeNode else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //链表遍历 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
流程梳理如下:
① 根据key的hashcode得到hash值,通过 (n - 1) & h 来定位索引位置,也就是位于tab的哪个index;
② e = tabAt(tab, (n - 1) & h),判断索引位置的 e 是否满足: hash相等且key也相等
③ 如果②不满足且结点是红黑树,那么进行树的查找;
④ 如果③不满足,则进行链表的查找
这里需要注意的是在进行定位、更新时均采用了sun.misc.Unsafe来保证volatile语义,其中casTabAt方法采用了CAS思想。代码如下所示:
// get put都是用了volatile static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); } //采用CAS思想进行值的更新 static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); }
② 树的查找
树节点的查找本质与前面我们分析HashMap中getTreeNode流程一致,都是hash进行对比判断左右。
Node<K,V> find(int h, Object k) { return findTreeNode(h, k, null); } final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) { if (k != null) { TreeNode<K,V> p = this; do { int ph, dir; K pk; TreeNode<K,V> q; TreeNode<K,V> pl = p.left, pr = p.right; if ((ph = p.hash) > h) p = pl; else if (ph < h) p = pr; else if ((pk = p.key) == k || (pk != null && k.equals(pk))) return p; else if (pl == null) p = pr; else if (pr == null) p = pl; else if ((kc != null || (kc = comparableClassFor(k)) != null) && (dir = compareComparables(kc, k, pk)) != 0) p = (dir < 0) ? pl : pr; else if ((q = pr.findTreeNode(h, k, kc)) != null) return q; else p = pl; } while (p != null); } return null; } }
comparableClassFor: 如果对象x的类是C,如果C实现了Comparable<C>接口,那么返回C,否则返回null。这个方法的作用就是查看对象x是否实现了Comparable接口
compareComparables: 如果x的类型是kc,返回k.compareTo(x)的比较结果。如果x为空,或者类型不是kc,返回0
pl 是p的左结点,pr是p的右结点。ph是p的hash,pk是p的k。h是目标key的hash(key)结果。kc是目标key的Class类型。
【3】核心方法put
① put
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { // 可以发现ConcurrentHashMap是不允许key value为null的 if (key == null || value == null) throw new NullPointerException(); // 获取hash值 (h ^ (h >>> 16)) & HASH_BITS int hash = spread(key.hashCode()); int binCount = 0; //循环,也有自旋的含义 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //首先判断是否需要初始化tab,初始tab会采用compareAndSwapInt if (tab == null || (n = tab.length) == 0) tab = initTable(); //如果目标位置没有元素,则采用CAS思想放到目标位置 //f = tabAt(tab, i = (n - 1) & hash) 表示索引目标位置的结点 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 如果正在扩容...也就是 f 为ForwardingNode类型 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; // 加锁synchronized 注意锁对象是f , 而不是tab[] synchronized (f) { // 必须确保 f = tabAt(tab, i = (n - 1) & hash) if (tabAt(tab, i) == f) { // 说明是链表,尾插法 if (fh >= 0) { binCount = 1; //进行遍历,判断key是否已经存在 binCount用来记录遍历的链表结点个数 for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 如果key不存在,则尾插入结点 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 如果是树,则触发putTreeVal向树中放入 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // 判断是否需要将链表转为红黑树 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) //树化,这里i = (n - 1) & hash),也就是槽位 //需要说明的是,这里树化与HashMap不同,其交给了TreeBin来构造 treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //统计总个数,这个里面可能会引起扩容 addCount(1L, binCount); return null; }
put方法简单梳理如下:
根据 key 的hashcode 计算出哈希值hash用于定位在tab[]的索引位置
判断tab是否需要进行初始化。
f = tabAt(tab, i = (n - 1) & hash) 表示索引目标位置的结点即为当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功。
如果当前位置的 hashcode == MOVED == -1,则需要进行扩容。当前线程也帮着进行扩容
如果都不满足,则利用 synchronized 锁写入数据。 加锁synchronized 注意锁对象是f即索引位置或者称之为“槽位”结点 , 而不是整个tab[] 或者jdk1.7中的Segment。
如果 binCount >= TREEIFY_THRESHOLD 则要转换为红黑树。
addCount方法记录元素量并可能进行扩容
binCount 的变量作用是什么?
- 在操作链表的分支if (fh >= 0)中 用于统计链表长度
- 在if (f instanceof TreeBin) 分支中看到, binCount=2 , 该值被直接赋值常量2
此外上面方法触发了如下几个关键方法:
- initTable()
- helpTransfer(tab, f);
- putTreeVal(hash, key,value)
- treeifyBin(tab, i)
- addCount(1L, binCount);
我们接下来再逐个分析研究。
② tab的初始化
sizeCtl是表的初始容量,默认是16,也可以通过构造函数指定。
这个方法也是线程安全的,它没有加锁,但是通过sizeCtl的值来控制,初始化前修改sizeCtl为-1,初始化完成后再把sizeCtl的值修改为n - (n >>> 2)也就是0.75*n(负载因子默认就是0.75哦),比如默认初始化tab.length为16,那么这里最后sizeCtl就是12,也就是“阈值”。
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // 说明正在扩容或者初始化 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin // 先修改为 -1 说明处于初始化 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") //初始化数组 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; // sc修改为0.75n sc = n - (n >>> 2); } } finally { //修改sizeCtl sizeCtl = sc; } break; } } return tab; }
③ helpTransfer
在 putVal 方法中,如果发现线程当前 hash 冲突了,也就是当前 hash 值对应的槽位(tab[i]这个索引位置
)有值了,且如果这个值是 -1 (MOVED),说明 Map 正在扩容。那么就帮助 Map 进行扩容。以加快速度。
如果当前map正在扩容,那么Node<K,V> f
结点一定是ForwardingNode。关于这一点可以在transfer方法里面看到。
// 如果发现有线程对链表或者树进行扩容,那么当前线程也需要帮助进行扩容。 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; // 判断 f 是ForwardingNode类型,并获取nextTab,nextTable 一定不为null if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { //获取扩容戳 假设 16 -> 32 扩容戳:1000 0000 0001 1011 int rs = resizeStamp(tab.length); //(sc = sizeCtl) < 0表示正在初始化或者扩容 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { //属于如下四种情况,直接break if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; // 将 sizeCtl + 1, (表示增加了一个线程帮助其扩容) if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { //实际扩容方法 transfer(tab, nextTab); break; } } return nextTab; } return table; }
while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0)
是用来校验nextTable与table 没有被修改, (sc = sizeCtl) < 0)是用来校验正在扩容。
RESIZE_STAMP_BITS 默认是16 //扩容戳移位,默认16 RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS // 65535 MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0)用来判断四种情况:
如果(sc >>> RESIZE_STAMP_SHIFT) != rs ,说明当前线程获取到的扩容唯一标识戳 非 本批次扩容(这个不理解,这两个玩意左右恒不相等!个人理解是jdk想表达的意思)
或者 sc == rs + 1 (扩容结束了,不再有线程进行扩容。默认第一个线程设置 sc ==rs <<16 位 + 2。当第一个线程结束扩容了,就会将 sc 减一,这个时候,sc =(rs<<16)+ 1;
或者 sc == rs + 65535 (如果达到最大帮助线程的数量就结束,应该是(rs << 16 )+65535)
或者transferIndex <= 0,transferIndex 在transfer方法中会被赋值,记录nextIndex,小于0说明map对象全局范围内的任务已经分配完了,当前线程进去也没活干
其实这里感觉有点问题,假设第一个线程扩容时将SIZECTL修改为了(rs << RESIZE_STAMP_SHIFT) + 2,扩容结束修改为了(rs << RESIZE_STAMP_SHIFT) + 1。第二个线程使用sc == rs + 1进行判断,这个判断应该恒为false,正确的表达式应该是sc == ( rs << RESIZE_STAMP_SHIFT) + 1。
同样的道理,(sc >>> RESIZE_STAMP_SHIFT) != rs什么时候会为true?
这是个人理解,如有误,烦请指出,不胜感激。[补充说明]上面这个方法确实是有问题的,下面是openjdk12-ConcurrentHashMap.java中helpTransfer方法。
/** * Helps transfer if a resize is in progress. */ final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { // rs = resizeStamp(tab.length) << 16 int rs = resizeStamp(tab.length) << RESIZE_STAMP_SHIFT; while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { // 只有三个判断条件!!! if (sc == rs + MAX_RESIZERS || sc == rs + 1 || transferIndex <= 0) break; if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
可以看到在jdk12中首先对 扩容戳进行了移位再得到rs,int rs = resizeStamp(tab.length) << RESIZE_STAMP_SHIFT;,然后进行了三种情况的判断:
sc == rs + MAX_RESIZERS,帮助线程是否达到最大数量
sc == rs + 1,扩容是否结束
transferIndex <= 0,是否无需帮助进行扩容
④ resizeStamp生成扩容戳
这个方法是用来生成扩容戳,假设n是16,那么得到的结果是:32795。这个方法可以保证每次扩容都生成唯一的生成戳。每次新的扩容,都有一个不同的n,这个生成戳就是根据n来计算出来的一个数字,n不同,这个数字也不同。
// 左移 RESIZE_STAMP_SHIFT 位后, 要变成一个负值 static final int resizeStamp(int n) { // RESIZE_STAMP_BITS 默认是16 // (1 << (RESIZE_STAMP_BITS - 1) 也就是 1 左移 15位 =32768 return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); }
Integer.numberOfLeadingZeros(n) 这个方法是返回无符号整数n最高位非0位前面的0的个数
比如10的二进制是 0000 0000 0000 0000 0000 0000 0000 1010
那么这个方法返回的值就是28 。
计算扩容戳的后半截计算逻辑是, 把 1 左移 (RESIZE_STAMP_BITS-1) (其值为 15) 位后, 与 numberOfLeadingZeros(n) 做或操作。 这样保证 resizeStamp 返回的 32 bit 结果的第 16 位一定是一个 1, 这样就保证了把这个 1 再左移 RESIZE_STAMP_SHIFT(其值为 16)后, resizeStamp 的第 32 bit , 是一个 1 , int 变量的最高位是符号位, 所以它就变成了负值。
rs<<RESIZE_STAMP_SHIFT
表示了什么?
rs是个高16位均为0,低16位记录了Integer.numberOfLeadingZeros(n)的数。左移RESIZE_STAMP_SHIFT也就是左移16后,其高16位也就是符号位1+Integer.numberOfLeadingZeros(n)。
其更新sizeCtl,最大值也就是rs<<RESIZE_STAMP_SHIFT+65535。也就是说rs<<RESIZE_STAMP_SHIFT的低16位即可存放扩容线程数量。
从这里我们也可以得出两个结论:
- sizeCtl 为负值时,高16位bit反映了正在进行的扩容操作是针对哪个容量进行的
- sizeCtl 为负值时, 低 16位bit 反映了参与此次扩容的线程有多少个
resizeStamp 为什么要满足左移RESIZE_STAMP_SHIFT 位 成为负值的性质 ?
一个线程首次进入 transfer 之前会将 sizeCtl 通过 CAS 方式更新为(rs<<RESIZE_STAMP_SHIFT) + 2, sizeCtl 是负值的时候表示当前数组正在扩容。
当扩容线程数量比较多的时候, 会导致sizeCtl符号位变化吗 ?
前面我们提到过当前进行扩容时,要么更新sizeCtl=(rs<<16)+2,要么更新sizeCtl=sc+1,这个过程要保证sizeCtl为负值。那么如果扩容线程数量很多时呢?会导致高位符号位变化吗?
答案是不会。
因为最大活跃线程数被另外一个常量 MAX_RESIZERS = 65535 控制, 65535 的 int 二进制形式是低 15 位全是 1。(resizeStamp<<16后) 的低16位完全可以存放最大活跃线程数。
在上面代码我们也可以看到当sc == rs<<16 + MAX_RESIZERS时,直接就break了,不会触发扩容。
因为最大活跃线程数被另外一个常量 MAX_RESIZERS = 65535 控制, 65535 的 int 二进制形式是低 15 位全是 1。(resizeStamp<<16后) 的低16位完全可以存放最大活跃线程数。
在上面代码我们也可以看到当sc == rs<<16 + MAX_RESIZERS时,直接就break了,不会触发扩容。
⑤ putTreeVal
如果研究过HashMap的树节点添加,那么下面这个方法还是很好理解的:
- 如果从左右分支找到了目标结点,那么直接返回;
- 如果找不到,那么就是新结点,首先要确定其parent;
- 然后根据dir的值,放到左右子节点
- 加锁,进行平衡插入,释放锁
整个过程和HashMap的不同之处就在于多了lockRoot();
和unlockRoot();
。
final TreeNode<K,V> putTreeVal(int h, K k, V v) { Class<?> kc = null; // 记录左右子树是否find过 boolean searched = false; //获取到root结点,然后无限循环直到return或者break for (TreeNode<K,V> p = root;;) { int dir, ph; K pk; //如果p为null,那么直接作为根节点,返回 if (p == null) { first = root = new TreeNode<K,V>(h, k, v, null, null); break; } // 判断是p的左分支还是右分支 else if ((ph = p.hash) > h) dir = -1; else if (ph < h) dir = 1; // 如果 p 就是目标结点,直接返回 else if ((pk = p.key) == k || (pk != null && k.equals(pk))) return p; // 尝试使用key的compareTo方法进行比较 else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) { //如果没有得到期望的结果(确定左右分支),且没有遍历过, // 那么就先进行左子树遍历后进行右子树遍历 if (!searched) { TreeNode<K,V> q, ch; searched = true; if (((ch = p.left) != null && (q = ch.findTreeNode(h, k, kc)) != null) || ((ch = p.right) != null && (q = ch.findTreeNode(h, k, kc)) != null)) return q; } // 必须得到dir为1或者-1 dir = tieBreakOrder(k, pk); } TreeNode<K,V> xp = p; // 如果dir < 0且p.left为null,那么就放在左分支; //如果dir > 0且p.right为null,那么就放在右分支; if ((p = (dir <= 0) ? p.left : p.right) == null) { TreeNode<K,V> x, f = first; //TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) first = x = new TreeNode<K,V>(h, k, v, f, xp); if (f != null) f.prev = x; if (dir <= 0) xp.left = x; else xp.right = x; //如果父节点是黑色,那么插入的子节点为红色 if (!xp.red) x.red = true; else { // 加锁 CAS+自旋 lockRoot(); try { // 平衡插入 root = balanceInsertion(root, x); } finally { // 解锁 unlockRoot(); } } break; } } assert checkInvariants(root); return null; }
⑥ treeifyBin
如果table.length<64
,那么触发tryPresize尝试进行扩容,否则就把所有链表结点替换为树结点最后形成一棵红黑树。方法入参中的index指的是索引位置(槽位)。
private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { // 判断是否需要扩容 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); //b 是索引位置结点,b.hash >= 0说明是链表结点 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 对槽位进行加锁 synchronized (b) { // 再次检测,免得这个这时候槽位结点已经发生改变 if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; // 从索引位置结点开始,一路next,将所有节点都替换为TreeNode for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } // 将new TreeBin<K,V>(hd)放到索引值 setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
这个方法最核心的地方就在于new TreeBin<K,V>(hd),这个对象实例化时构造函数会触发红黑树的构造。这个在博文认真学习jdk1.8下ConcurrentHashMap的扩容机制进行了阐述。
1.8 在 1.7 的数据结构上做了大的改动,采用红黑树之后可以保证查询效率(O(logn)),甚至取消了 ReentrantLock 改为了 synchronized,这样可以看出在新版的 JDK 中对 synchronized 优化是很好的。
可能会有疑问synchronized 是悲观锁、重量级锁,ReentrantLock 是显示锁,为什么使用了synchronized 呢?
这是因为JDK对synchronized 锁进行了优化,在jdk1.6对锁的实现引入了大量的优化。如自旋锁、适应性自旋锁、锁消除、锁粗化、偏向锁、轻量级锁等技术来减少锁操作的开销。所以,不要再认为synchronized 效率就一定比ReentrantLock 低,synchronized 现在效率也不低了。