为什么要用CurrentHashmap
我们大家都知道,HashMap是线程不安全的,那么为了解决HashMap线程不安全有很多方案,比如
- 使用Collections.synchronizedMap(Map)创建线程安全的map集合;
- Hashtable
- ConcurrentHashMap
Collections.synchronizedMap(Map)的方法,很明显都加了synchronized来保证线程安全
Hashtable是给所有方法增加synchronized,这样牺牲了并发,效率并不高,于是出现了ConcurrentHashMap,synchronized+CAS去实现线程安全,效率更高一些
环境
本文的CurrentHashmap基于JDK1.8来讲述;
CurrentHashmap在JDK1.7和JDK1.8有些差别
- 在JDK1.7中ConcurrentHashMap采用了数组+Segment+分段锁的方式实现。
- JDK1.8中是数组+链表,链表在数据过多时转为红黑树
存储结构图
CurrentHashmap的结构图,数组+链表,链表在数据过多时转为红黑树
- 负载因子75%,就是说元素个数超过数组长度的0.75时就会发生扩容
- 链表上的元素超过7时就会转成红黑树(平衡二叉树存储)
扩容
为什么要扩容?
由上图分析,当数组保存的链表越来越多时,新来的数据大概率会被保存在现有的链表中,随着链表越来越长,哈希查找的速度逐渐由O(1)趋向O(n/2),
所以就要进行扩容,在数组的元素个数超过0.75时进行扩容,才能有效解决这个问题。
另外 ConcurrentHashMap 还会有链表转红黑树的操作,以提高查找的速度,红黑树时间复杂度为 O (logn),而链表是 O (n/2),因此只在 O (logn)<O (n/2) 时才会进行转换,也就是以 8 作为分界点。
ConcurrentHashMap 1.7 源码解析
ConcurrentHashMap 的一些成员变量
//默认初始化容量,这个和 HashMap中的容量是一个概念,表示的是整个 Map的容量 static final int DEFAULT_INITIAL_CAPACITY = 16; //默认加载因子 static final float DEFAULT_LOAD_FACTOR = 0.75f; //默认的并发级别,这个参数决定了 Segment 数组的长度 static final int DEFAULT_CONCURRENCY_LEVEL = 16; //最大的容量 static final int MAXIMUM_CAPACITY = 1 << 30; //每个Segment中table数组的最小长度为2,且必须是2的n次幂。 //由于每个Segment是懒加载的,用的时候才会初始化,因此为了避免使用时立即调整大小,设定了最小容量2 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; //用于限制Segment数量的最大值,必须是2的n次幂 static final int MAX_SEGMENTS = 1 << 16; // slightly conservative //在size方法和containsValue方法,会优先采用乐观的方式不加锁,直到重试次数达到2,才会对所有Segment加锁 //这个值的设定,是为了避免无限次的重试。后边size方法会详讲怎么实现乐观机制的。 static final int RETRIES_BEFORE_LOCK = 2; //segment掩码值,用于根据元素的hash值定位所在的 Segment 下标。后边会细讲 final int segmentMask; //和 segmentMask 配合使用来定位 Segment 的数组下标,后边讲。 final int segmentShift; // Segment 组成的数组,每一个 Segment 都可以看做是一个特殊的 HashMap final Segment<K,V>[] segments; //Segment 对象,继承自 ReentrantLock 可重入锁。 //其内部的属性和方法和 HashMap 神似,只是多了一些拓展功能。 static final class Segment<K,V> extends ReentrantLock implements Serializable { //这是在 scanAndLockForPut 方法中用到的一个参数,用于计算最大重试次数 //获取当前可用的处理器的数量,若大于1,则返回64,否则返回1。 static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; //用于表示每个Segment中的 table,是一个用HashEntry组成的数组。 transient volatile HashEntry<K,V>[] table; //Segment中的元素个数,每个Segment单独计数(下边的几个参数同样的都是单独计数) transient int count; //每次 table 结构修改时,如put,remove等,此变量都会自增 transient int modCount; //当前Segment扩容的阈值,同HashMap计算方法一样也是容量乘以加载因子 //需要知道的是,每个Segment都是单独处理扩容的,互相之间不会产生影响 transient int threshold; //加载因子 final float loadFactor; //Segment构造函数 Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } ... // put(),remove(),rehash() 方法都在此类定义 } // HashEntry,存在于每个Segment中,它就类似于HashMap中的Node,用于存储键值对的具体数据和维护单向链表的关系 static final class HashEntry<K,V> { //每个key通过哈希运算后的结果,用的是 Wang/Jenkins hash 的变种算法,此处不细讲,感兴趣的可自行查阅相关资料 final int hash; final K key; //value和next都用 volatile 修饰,用于保证内存可见性和禁止指令重排序 volatile V value; //指向下一个节点 volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } }
可以看出1.7的1.8的区别并不大,默认容量都是16,负载因子0.75等等,区别在于1.7多了一个Segment分段锁,1.8锁的是Node,锁的粒度变小,并发性提高冲突变少
put方法
//这是Map的put方法 public V put(K key, V value) { Segment<K,V> s; //不支持value为空 if (value == null) throw new NullPointerException(); //通过 Wang/Jenkins 算法的一个变种算法,计算出当前key对应的hash值 int hash = hash(key); //上边我们计算出的 segmentShift为28,因此hash值右移28位,说明此时用的是hash的高4位, //然后把它和掩码15进行与运算,得到的值一定是一个 0000 ~ 1111 范围内的值,即 0~15 。 int j = (hash >>> segmentShift) & segmentMask; //这里是用Unsafe类的原子操作找到Segment数组中j下标的 Segment 对象 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment //初始化j下标的Segment s = ensureSegment(j); //在此Segment中添加元素 return s.put(key, hash, value, false); }
ConcurrentHashMap 1.8 源码解析
put方法的源码
可以看出,实际上调用的是 putVal 方法,第三个参数传入 false,控制 key 存在时覆盖原来的值。
putVal方法的源码
/** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { // 判断要添加的key或者value是否为空,为空抛出异常 if (key == null || value == null) throw new NullPointerException(); // key的hashCode算出hash值 int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 如果table为空,那么就调用initTable初始化table if (tab == null || (n = tab.length) == 0) // 如果table为空,那么就调用initTable初始化table tab = initTable(); //通过hash 值计算table 中的索引,如果该位置没有数据,则可以put // tabAt方法以volatile读的方式读取table数组中的元素 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //casTabAt方法以CAS的方式,将元素插入table数组 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 如果table位置上的节点状态时MOVE,则表明hash 正在进行扩容搬移数据的过程中 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f);// 帮助扩容 // hash 表该位置上有数据,可能是链表,也可能是一颗树 else { V oldVal = null; synchronized (f) { // 上锁后,只有再该位置数据和上锁前一致才进行,否则需要重新循环 if (tabAt(tab, i) == f) { // hash 值>=0 表明这是一个链表结构 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; // 存在一样的key就覆盖它的value if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 不存在该key,将新数据添加到链表尾 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 该位置是红黑树,是TreeBin对象(注意是TreeBin,而不是TreeNode) else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; //通过TreeBin 中的方法,将数据添加到红黑树中 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //if 成立,说明遍历的是链表结构,并且超过了阀值,需要将链表转换为树 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i);//将table 索引i 的位置上的链表转换为红黑树 if (oldVal != null) return oldVal; break; } } } // ConcurrentHashMap 容量增加1,检查是否需要扩容 addCount(1L, binCount); return null; }