3.1.8 size()操作
如果我们要统计整个ConcurrentHashMap里元素的大小,就必须统计所有Segment里元素的大小后求和。Segment里的全局变量count是一个volatile变量,那么在多线程场景下,我们是不是直接把所有Segment的count相加就可以得到整个ConcurrentHashMap大小了呢?不是的,虽然相加时可以获取每个Segment的count的最新值,但是拿到之后可能累加前使用的count发生了变化,那么统计结果就不准了。所以最安全的做法,是在统计size的时候把所有Segment的put,remove和clean方法全部锁住,但是这种做法显然非常低效。
因为在累加count操作过程中,之前累加过的count发生变化的几率非常小,所以ConcurrentHashMap的做法是先尝试2(RETRIES_BEFORE_LOCK)次通过不锁住Segment的方式来统计各个Segment大小,如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。
那么ConcurrentHashMap是如何判断在统计的时候容器是否发生了变化呢?使用modCount变量,在put , remove和clean方法里操作元素前都会将变量modCount进行加1,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化。
public int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. final Segment<K,V>[] segments = this.segments; int size; boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { //尝试2 次通过不锁住Segment的方式来统计各个Segment大小 if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } //如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment //的大小。 if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }
详细源码注释可以看
3.2 jdk1.8的源码
3.2.1主要数据结构
1.8中放弃了Segment
臃肿的设计,取而代之的是采用Node
+ CAS
+ Synchronized
来保证并发安全进行实现,结构如下:
ConcurrentHashMap在put操作时,如果发现链表结构中的元素超过了TREEIFY_THRESHOLD(默认为8),则会把链表转换为红黑树,已便于提高查询效率。代码如下:
if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i);
ConcurrentHashMap,提供了许多内部类来进行辅助实现,如Node,TreeNode,TreeBin等等。下面我们就一起来看看ConcurrentHashMap几个重要的内部类。
ConcurrentHashMap的成员变量
// 最大容量:2^30=1073741824 private static final int MAXIMUM_CAPACITY = 1 << 30; // 默认初始值,必须是2的幕数 private static final int DEFAULT_CAPACITY = 16; // static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; // private static final int DEFAULT_CONCURRENCY_LEVEL = 16; // private static final float LOAD_FACTOR = 0.75f; // 链表转红黑树阀值,> 8 链表转换为红黑树 static final int TREEIFY_THRESHOLD = 8; // 树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数 //量,<=UNTREEIFY_THRESHOLD 则untreeify(lo)) static final int UNTREEIFY_THRESHOLD = 6; // static final int MIN_TREEIFY_CAPACITY = 64; // private static final int MIN_TRANSFER_STRIDE = 16; // private static int RESIZE_STAMP_BITS = 16; // 2^15-1,help resize的最大线程数 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // 32-16=16,sizeCtl中记录size大小的偏移量 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; // forwarding nodes的hash值 static final int MOVED = -1; // 树根节点的hash值 static final int TREEBIN = -2; // ReservationNode的hash值 static final int RESERVED = -3; // 可用处理器数量 static final int NCPU = Runtime.getRuntime().availableProcessors(); // 用来存放Node节点数据的,默认为null,默认大小为16的数组,每次扩容时大小总是2的幂次方; transient volatile Node<K, V>[] table; // 扩容时新生成的数据,数组为table的两倍; private transient volatile Node<K, V>[] nextTable;
Node
在Node内部类中,其属性value、next都是带有volatile的。同时其对value的setter方法进行了特殊处理,不允许直接调用其setter方法来修改value的值。最后Node还提供了find方法来赋值map.get()。
static class Node<K, V> implements Map.Entry<K, V> { final int hash; final K key; //get操作全程不需要加锁是因为Node的成员val是用volatile修饰的和数组用volatile修饰没有关系。 volatile V val; volatile Node<K, V> next; Node(int hash, K key, V val, Node<K, V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } public final String toString() { return key + "=" + val; } public final V setValue(V value) { throw new UnsupportedOperationException(); } public final boolean equals(Object o) { Object k, v, u; Map.Entry<?, ?> e; return ((o instanceof Map.Entry) && (k = (e = (Map.Entry<?, ?>) o).getKey()) != null && (v = e.getValue()) != null && (k == key || k.equals(key)) && (v == (u = val) || v.equals(u))); } /** * Virtualized support for map.get(); overridden in subclasses. */ Node<K, V> find(int h, Object k) { Node<K, V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } }
TreeNode
我们在学习HashMap的时候就知道,HashMap的核心数据结构就是链表。在ConcurrentHashMap中就不一样了,如果链表的数据过长是会转换为红黑树来处理。当它并不是直接转换,而是将这些链表的节点包装成TreeNode放在TreeBin对象中,然后由TreeBin完成红黑树的转换。所以TreeNode也必须是ConcurrentHashMap的一个核心类,其为树节点类,定义如下:
static final class TreeNode<K, V> extends Node<K, V> { TreeNode<K, V> parent; // red-black tree links TreeNode<K, V> left; TreeNode<K, V> right; TreeNode<K, V> prev; // needed to unlink next upon deletion boolean red; TreeNode(int hash, K key, V val, Node<K, V> next, TreeNode<K, V> parent) { super(hash, key, val, next); this.parent = parent; } Node<K, V> find(int h, Object k) { return findTreeNode(h, k, null); } /** * 查找hash为h,key为k的节点 */ final TreeNode<K, V> findTreeNode(int h, Object k, Class<?> kc) { if (k != null) { TreeNode<K, V> p = this; do { int ph, dir; K pk; TreeNode<K, V> q; TreeNode<K, V> pl = p.left, pr = p.right; if ((ph = p.hash) > h) p = pl; else if (ph < h) p = pr; else if ((pk = p.key) == k || (pk != null && k.equals(pk))) return p; else if (pl == null) p = pr; else if (pr == null) p = pl; else if ((kc != null || (kc = comparableClassFor(k)) != null) && (dir = compareComparables(kc, k, pk)) != 0) p = (dir < 0) ? pl : pr; else if ((q = pr.findTreeNode(h, k, kc)) != null) return q; else p = pl; } while (p != null); } return null; } }
源码展示TreeNode继承Node,且提供了findTreeNode用来查找查找hash为h,key为k的节点。
TreeBin
该类并不负责key-value的键值对包装,它用于在链表转换为红黑树时包装TreeNode节点,也就是说ConcurrentHashMap红黑树存放是TreeBin,不是TreeNode。该类封装了一系列的方法,包括putTreeVal、lookRoot、UNlookRoot、remove、balanceInsetion、balanceDeletion。由于TreeBin的代码太长我们这里只展示构造方法(构造方法就是构造红黑树的过程):
static final class TreeBin<K, V> extends Node<K, V> { TreeNode<K, V> root; volatile TreeNode<K, V> first; volatile Thread waiter; volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock static int tieBreakOrder(Object a, Object b) { int d; if (a == null || b == null || (d = a.getClass().getName().compareTo(b.getClass().getName())) == 0) d = (System.identityHashCode(a) <= System.identityHashCode(b) ? -1 : 1); return d; } //构造方法就是在构造一个红黑树的过程。 TreeBin(TreeNode<K, V> b) { super(TREEBIN, null, null, null); this.first = b; TreeNode<K, V> r = null; for (TreeNode<K, V> x = b, next; x != null; x = next) { next = (TreeNode<K, V>) x.next; x.left = x.right = null; if (r == null) { x.parent = null; x.red = false; r = x; } else { K k = x.key; int h = x.hash; Class<?> kc = null; for (TreeNode<K, V> p = r;;) { int dir, ph; K pk = p.key; if ((ph = p.hash) > h) dir = -1; else if (ph < h) dir = 1; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) dir = tieBreakOrder(k, pk); TreeNode<K, V> xp = p; if ((p = (dir <= 0) ? p.left : p.right) == null) { x.parent = xp; if (dir <= 0) xp.left = x; else xp.right = x; r = balanceInsertion(r, x); break; } } } } this.root = r; assert checkInvariants(root); } private final void lockRoot() { //略 } private final void unlockRoot() { //略 } private final void contendedLock() { //略 } final Node<K, V> find(int h, Object k) { //略 } final TreeNode<K, V> putTreeVal(int h, K k, V v) { //略 } final boolean removeTreeNode(TreeNode<K, V> p) { //略 } static <K, V> TreeNode<K, V> rotateLeft(TreeNode<K, V> root, TreeNode<K, V> p) { //略 } static <K, V> TreeNode<K, V> rotateRight(TreeNode<K, V> root, TreeNode<K, V> p) { //略 } static <K, V> TreeNode<K, V> balanceInsertion(TreeNode<K, V> root, TreeNode<K, V> x) { //略 } static <K, V> TreeNode<K, V> balanceDeletion(TreeNode<K, V> root, TreeNode<K, V> x) { //略 } static <K, V> boolean checkInvariants(TreeNode<K, V> t) { //略 } }