前置知识
👉👉👉 如果面试也能这样说HashMap,那么就不会有那么多遗憾!-CSDN博客
线程安全集合类概述
线程安全集合类可以分为三大类:
- 遗留的线程安全集合如 Hashtable , Vector (出现时间比较早,而且所有方法都是用synchronized修饰,并发性能比较低,时至今日有更好的实现,更好的替代)
- 使用 Collections 装饰的线程安全集合,如:(将原本不安全的集合变成安全的集合)
- Collections.synchronizedCollection
- Collections.synchronizedList
- Collections.synchronizedMap
- Collections.synchronizedSet
- Collections.synchronizedNavigableMap
- Collections.synchronizedNavigableSet
- Collections.synchronizedSortedMap
- Collections.synchronizedSortedSet
private static class SynchronizedMap<K,V> implements Map<K,V>, Serializable { private static final long serialVersionUID = 1978198479659022715L; private final Map<K,V> m; // Backing Map final Object mutex; // Object on which to synchronize SynchronizedMap(Map<K,V> m) { this.m = Objects.requireNonNull(m); mutex = this; } public int size() { synchronized (mutex) {return m.size();} } public boolean isEmpty() { synchronized (mutex) {return m.isEmpty();} } public boolean containsKey(Object key) { synchronized (mutex) {return m.containsKey(key);} } public boolean containsValue(Object value) { synchronized (mutex) {return m.containsValue(value);} } public V get(Object key) { synchronized (mutex) {return m.get(key);} } public V put(K key, V value) { synchronized (mutex) {return m.put(key, value);} } public V remove(Object key) { synchronized (mutex) {return m.remove(key);} } public void putAll(Map<? extends K, ? extends V> map) { synchronized (mutex) {m.putAll(map);} } public void clear() { synchronized (mutex) {m.clear();} }
传入的就是线程不安全的map,将其变成线程安全的
本质上就是多加了一个synchronized 锁住了对象
- java.util.concurrent.*
重点介绍 java.util.concurrent.* 下的线程安全集合类,可以发现它们有规律,里面包含三类关键词:
Blocking、CopyOnWrite、Concurrent
- Blocking 大部分实现基于锁,并提供用来阻塞的方法(很多方法在不满足条件的时候需要等待)
- CopyOnWrite 之类容器修改开销相对较重(适用于读多写少)
- Concurrent 类型的容器
内部很多操作使用 cas 优化,一般可以提供较高吞吐量
弱一致性
- 遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历,这时内容是旧的
- 求大小弱一致性,size 操作未必是 100% 准确
- 读取弱一致性
遍历时如果发生了修改,对于非安全容器来讲,使用 fail-fast 机制也就是让遍历立刻失败,抛出ConcurrentModificationException,不再继续遍历
一致性 和 性能 两者不可兼得
ConcurrentHashMap
生成测试数据
static final String ALPHA = "abcedfghijklmnopqrstuvwxyz"; public static void main(String[] args) { int length = ALPHA.length(); int count = 200; List<String> list = new ArrayList<>(length * count); for (int i = 0; i < length; i++) { char ch = ALPHA.charAt(i); for (int j = 0; j < count; j++) { list.add(String.valueOf(ch)); } } Collections.shuffle(list); for (int i = 0; i < 26; i++) { try (PrintWriter out = new PrintWriter( new OutputStreamWriter( new FileOutputStream("tmp/" + (i+1) + ".txt")))) { String collect = list.subList(i * count, (i + 1) * count).stream() .collect(Collectors.joining("\n")); out.print(collect); } catch (IOException e) { } } }
模版代码,模版代码中封装了多线程读取文件的代码
private static <V> void demo(Supplier<Map<String,V>> supplier, BiConsumer<Map<String,V>,List<String>> consumer) { Map<String, V> counterMap = supplier.get(); List<Thread> ts = new ArrayList<>(); for (int i = 1; i <= 26; i++) { int idx = i; Thread thread = new Thread(() -> { List<String> words = readFromFile(idx); consumer.accept(counterMap, words); }); ts.add(thread); } ts.forEach(t->t.start()); ts.forEach(t-> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(counterMap); } public static List<String> readFromFile(int i) { ArrayList<String> words = new ArrayList<>(); try (BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream("tmp/" + i +".txt")))) { while(true) { String word = in.readLine(); if(word == null) { break; } words.add(word); } return words; } catch (IOException e) { throw new RuntimeException(e); } }
你要做的是实现两个参数
- 一是提供一个 map 集合,用来存放每个单词的计数结果,key 为单词,value 为计数
- 二是提供一组操作,保证计数的安全性,会传递 map 集合以及 单词 List
正确结果输出应该是每个单词出现 200 次
{a=200, b=200, c=200, d=200, e=200, f=200, g=200, h=200, i=200, j=200, k=200, l=200, m=200, n=200, o=200, p=200, q=200, r=200, s=200, t=200, u=200, v=200, w=200, x=200, y=200, z=200}
下面的实现为:
demo( // 创建 map 集合 // 创建 ConcurrentHashMap 对不对? () -> new HashMap<String, Integer>(), // 进行计数 (map, words) -> { for (String word : words) { Integer counter = map.get(word); int newValue = counter == null ? 1 : counter + 1; map.put(word, newValue); } });
结果和预期的并不一样
将HashMap 换成 ConcurrentHashMap,发现还是不行!
如果将其改成了ConcurrentHashMap的话,那么其实下面的几行也不是原子的,因为使用了线程安全的集合,只能保证,每行是原子的,但是整体不是
或者加上 synchronized 锁。但是这样做没有好处,ConcurrentHashMap本身加的就是细粒度的锁,你使用synchronized 这么重的锁,影响性能,影响并发度。
ConcurrentHashMap 原理
JDK 7 HashMap 并发死链
注意
要在 JDK 7 下运行,否则扩容机制和 hash 的计算方法都变了
以下测试代码是精心准备的,不要随便改动
public static void main(String[] args) { // 测试 java 7 中哪些数字的 hash 结果相等 System.out.println("长度为16时,桶下标为1的key"); for (int i = 0; i < 64; i++) { if (hash(i) % 16 == 1) { System.out.println(i); } } System.out.println("长度为32时,桶下标为1的key"); for (int i = 0; i < 64; i++) { if (hash(i) % 32 == 1) { System.out.println(i); } } // 1, 35, 16, 50 当大小为16时,它们在一个桶内 final HashMap<Integer, Integer> map = new HashMap<Integer, Integer>(); // 放 12 个元素 map.put(2, null); map.put(3, null); map.put(4, null); map.put(5, null); map.put(6, null); map.put(7, null); map.put(8, null); map.put(9, null); map.put(10, null); map.put(16, null); map.put(35, null); map.put(1, null); System.out.println("扩容前大小[main]:"+map.size()); new Thread() { @Override public void run() { // 放第 13 个元素, 发生扩容 map.put(50, null); System.out.println("扩容后大小[Thread-0]:"+map.size()); } }.start(); new Thread() { @Override public void run() { // 放第 13 个元素, 发生扩容 map.put(50, null); System.out.println("扩容后大小[Thread-1]:"+map.size()); } }.start(); } final static int hash(Object k) { int h = 0; if (0 != h && k instanceof String) { return sun.misc.Hashing.stringHash32((String) k); } h ^= k.hashCode(); h ^= (h >>> 20) ^ (h >>> 12); return h ^ (h >>> 7) ^ (h >>> 4); }
死链复现
调试工具使用 idea
在 HashMap 源码 590 行加断点
int newCapacity = newTable.length;
断点的条件如下,目的是让 HashMap 在扩容为 32 时,并且线程为 Thread-0 或 Thread-1 时停下来
newTable.length==32 && ( Thread.currentThread().getName().equals("Thread-0")|| Thread.currentThread().getName().equals("Thread-1") )
断点暂停方式选择 Thread,否则在调试 Thread-0 时,Thread-1 无法恢复运行
运行代码,程序在预料的断点位置停了下来,输出
长度为16时,桶下标为1的key 1 16 35 50 长度为32时,桶下标为1的key 1 35 扩容前大小[main]:12
在jdk7 中,hashmap是采用头插法插入的
接下来进入扩容流程调试
在 HashMap 源码 594 行加断点
Entry<K,V> next = e.next; // 593 if (rehash) // 594 // ...
这是为了观察 e 节点和 next 节点的状态,Thread-0 单步执行到 594 行,再 594 处再添加一个断点(条件
Thread.currentThread().getName().equals(“Thread-0”))
这时可以在 Variables 面板观察到 e 和 next 变量,使用 view as -> Object 查看节点状态
e (1)->(35)->(16)->null next (35)->(16)->null
在 Threads 面板选中 Thread-1 恢复运行,可以看到控制台输出新的内容如下,Thread-1 扩容已完成
newTable[1] (35)->(1)->null 扩容后大小:13
这时 Thread-0 还停在 594 处, Variables 面板变量的状态已经变化为
e (1)->null next (35)->(1)->null
为什么呢,因为 Thread-1 扩容时链表也是后加入的元素放入链表头,因此链表就倒过来了,但 Thread-1 虽然结果正确,但它结束后 Thread-0 还要继续运行
接下来就可以单步调试(F8)观察死链的产生了
下一轮循环到 594,将 e 搬迁到 newTable 链表头
newTable[1] (1)->null e (35)->(1)->null next (1)->null
下一轮循环到 594,将 e 搬迁到 newTable 链表头
newTable[1] (35)->(1)->null e (1)->null next null
再看看源码
e.next = newTable[1]; // 这时 e (1,35) // 而 newTable[1] (35,1)->(1,35) 因为是同一个对象 // 相当于 1 -> 35 -> 1 newTable[1] = e; // 再尝试将 e 作为链表头, 死链已成 e = next; // 虽然 next 是 null, 会进入下一个链表的复制, 但死链已经形成了
源码分析
HashMap 的并发死链发生在扩容时
void transfer(Entry[] newTable, boolean rehash) { int newCapacity = newTable.length; for (Entry<K,V> e : table) { while(null != e) { Entry<K,V> next = e.next; // 1 处 if (rehash) { e.hash = null == e.key ? 0 : hash(e.key); } int i = indexFor(e.hash, newCapacity); // 2 处 // 将新元素加入 newTable[i], 原 newTable[i] 作为新元素的 next e.next = newTable[i]; newTable[i] = e; e = next; } } }
假设 map 中初始元素是
原始链表,格式:[下标] (key,next) [1] (1,35)->(35,16)->(16,null) 线程 a 执行到 1 处 ,此时局部变量 e 为 (1,35),而局部变量 next 为 (35,16) 线程 a 挂起 线程 b 开始执行 第一次循环 [1] (1,null) 第二次循环 [1] (35,1)->(1,null) 第三次循环 [1] (35,1)->(1,null) [17] (16,null) 切换回线程 a,此时局部变量 e 和 next 被恢复,引用没变但内容变了:e 的内容被改为 (1,null),而 next 的内 容被改为 (35,1) 并链向 (1,null) 第一次循环 [1] (1,null) 第二次循环,注意这时 e 是 (35,1) 并链向 (1,null) 所以 next 又是 (1,null) [1] (35,1)->(1,null) 第三次循环,e 是 (1,null),而 next 是 null,但 e 被放入链表头,这样 e.next 变成了 35 (2 处) [1] (1,35)->(35,1)->(1,35) 已经是死链了
小结
- 究其原因,是因为在多线程环境下使用了非线程安全的 map 集合
- JDK 8 虽然将扩容算法做了调整,不再将元素加入链表头(而是保持与扩容前一样的顺序),但仍不意味着能
够在多线程环境下能够安全扩容,还会出现其它问题(如扩容丢数据)
剑指JUC原理-19.线程安全集合(下):https://developer.aliyun.com/article/1413695