在日常开发中,我们几乎每天都在和集合容器打交道,HashMap作为最常用的键值对容器,以其O(1)的读写性能成为开发首选。但一旦进入高并发场景,HashMap的线程不安全特性会直接导致数据丢失、容器崩溃等致命问题,而Hashtable、Collections.synchronizedMap这类全表加锁的同步容器,又会在高并发下出现严重的性能瓶颈,成为系统的吞吐量瓶颈。
高并发场景下,我们需要的是兼顾线程安全与极致性能的并发容器,而ConcurrentHashMap作为其中的标杆,几乎是高并发开发的必备技能。但很多开发者对它的认知只停留在“线程安全的HashMap”,不仅没有发挥出它的全部性能优势,还经常踩中线程安全的坑,甚至在不合适的场景强行使用,导致系统性能下降。
本文将从底层原理、核心API、全场景选型、生产避坑、最佳实践五个维度,彻底讲透以ConcurrentHashMap为核心的高并发容器体系,让你既能夯实底层基础,又能直接解决生产环境的实际问题。
一、为什么普通容器扛不住高并发?
1. HashMap的线程不安全致命问题
HashMap的线程不安全主要体现在三个核心维度:
- 并发put导致数据丢失:当两个线程同时对同一个桶位执行put操作,会出现其中一个线程的插入数据被覆盖,最终数据丢失的问题。
- 并发扩容导致结构异常:JDK7的HashMap在扩容时采用头插法,并发扩容会导致链表形成循环,当get一个不存在的key时,会触发死循环导致CPU占用100%。JDK8虽然改为尾插法解决了死循环问题,但依然存在数据丢失、数据覆盖的风险。
- 并发迭代触发快速失败:HashMap的迭代器采用fail-fast机制,迭代过程中如果有其他线程修改了容器结构,会立刻抛出ConcurrentModificationException,导致程序异常中断。
2. 同步容器的性能灾难
为了解决HashMap的线程不安全问题,很多开发者会使用Hashtable或者Collections.synchronizedMap,但这两个容器的实现原理都是全表加锁,所有读写操作都要竞争同一把锁,同一时间只能有一个线程执行操作,即使是两个完全无关的get操作,也要排队等待锁释放。在高并发场景下,锁竞争会极其激烈,吞吐量急剧下降,完全无法满足高并发的性能要求。
二、ConcurrentHashMap 核心原理深度拆解
1. 核心数据结构
JDK17的ConcurrentHashMap底层核心数据结构是Node数组 + 单向链表 + 红黑树,底层CAS操作全面替换为VarHandle,消除了对Unsafe的依赖,同时优化了自旋逻辑,对虚拟线程更加友好。
核心结构说明:
- table:volatile修饰的Node数组,是ConcurrentHashMap的核心存储结构,数组长度始终保持2的幂次,保证hash定位的效率。
- Node:链表的基础节点,核心属性包括hash、key、volatile修饰的val、volatile修饰的next。val和next的volatile修饰,是get方法可以无锁执行的核心原因。
- TreeNode:红黑树的节点,当链表长度达到阈值时,会转换为TreeNode节点,但不会直接转为红黑树,而是通过TreeBin包装。
- TreeBin:红黑树的包装类,维护红黑树的根节点和读写锁状态,保证红黑树并发读写的线程安全,同时支持无锁读。
- ForwardingNode:扩容时的特殊节点,当一个桶位的节点完成迁移后,会在原数组桶位放置该节点,其他线程操作时看到该节点,会协助执行扩容操作。
这里需要纠正一个广泛传播的错误认知:链表长度达到8,并不会立刻转为红黑树。只有同时满足两个条件,才会触发红黑树转换:① 链表长度 >= 8;② 数组table的长度 >= 64。如果数组长度小于64,会优先触发数组扩容,而非转红黑树。这个设计的核心逻辑是,数组长度过小时hash碰撞概率更高,扩容可以从根本上减少碰撞,效果远优于转红黑树。
2. 线程安全的核心:锁粒度与锁优化
ConcurrentHashMap能兼顾线程安全和高性能的核心,是极致的锁粒度控制。它放弃了JDK7的分段锁设计,采用桶级锁实现,锁的对象是每个桶位的头节点,只有当两个线程操作同一个桶位时,才会发生锁竞争,不同桶位的操作完全并行,极大提升了并发度。
同时,ConcurrentHashMap使用synchronized锁而非ReentrantLock,核心原因有三点:
- JDK1.6之后,synchronized已经实现了偏向锁、轻量级锁、重量级锁的自适应优化,在低竞争场景下,性能与ReentrantLock持平甚至更优。
- synchronized是JVM原生支持的锁,会自动释放锁,不会出现因异常导致锁无法释放的问题,代码安全性更高。
- 锁粒度已经缩小到桶位头节点,竞争概率极低,大部分场景下只会用到偏向锁或轻量级锁,不需要ReentrantLock的高级功能,同时synchronized更节省内存,每个Node节点无需维护额外的锁对象。
3. 核心方法底层原理
3.1 put方法:原子插入的全流程
put方法是ConcurrentHashMap最核心的写入方法,整个流程全程保证线程安全,同时最大化性能,核心流程如下:
核心流程拆解:
- 数组初始化:如果数组未初始化,会调用initTable方法,通过CAS操作sizeCtl变量,保证只有一个线程能完成数组初始化,其他线程自旋等待。
- 桶位定位:通过key的hashCode计算hash值,用hash & (table.length - 1)定位到对应桶位,保证O(1)的定位效率。
- 空桶位插入:如果桶位为空,直接通过CAS操作插入新节点,无需加锁,性能极高。
- 扩容协助:如果桶位头节点是ForwardingNode,说明当前容器正在扩容,当前线程会协助执行扩容操作,充分利用多线程性能加快扩容速度。
- 加锁写入:如果桶位不为空且非扩容节点,用synchronized加锁桶位头节点,遍历链表或红黑树,key已存在则更新value,不存在则在链表尾部插入新节点。
- 结构转换:插入完成后,判断链表长度是否>=8,若是则再判断数组长度是否>=64,满足则转红黑树,不满足则触发扩容。
- 计数与扩容判断:最后调用addCount方法更新元素总数,判断总数是否达到扩容阈值,若达到则触发扩容流程。
3.2 get方法:无锁读的核心奥秘
ConcurrentHashMap的get方法全程无锁,性能与HashMap几乎持平,核心原理是volatile关键字的内存可见性保证,结合JMM的happen-before规则:
- Node节点的val属性是volatile修饰的,对volatile变量的写操作happen-before于后续对这个变量的读操作,其他线程修改了节点的value,get线程能立刻看到最新值。
- Node节点的next属性是volatile修饰的,遍历链表时能看到最新的节点结构,不会出现遍历过程中节点丢失的问题。
- table数组是volatile修饰的,扩容后的新数组能立刻被其他线程看到,不会出现扩容后读不到新数组的问题。
get方法的核心流程:
- 计算key的hash值,定位到对应桶位。
- 桶位为空则直接返回null。
- 桶位头节点的hash和key匹配,直接返回头节点的val。
- 头节点是ForwardingNode,说明正在扩容,前往新数组查找对应key。
- 若是链表或红黑树,遍历结构找到匹配的key,返回对应的val。
整个流程无任何加锁操作,完全靠volatile保证可见性,因此get方法性能极高,即使高并发场景下也不会有锁竞争,这也是ConcurrentHashMap在读多写少场景下性能远超同步容器的核心原因。
3.3 remove方法:安全删除的实现
remove方法的核心逻辑与put方法类似,基于桶级锁实现,核心流程:
- 计算key的hash值,定位到对应桶位。
- 桶位为空则直接返回null。
- 桶位是ForwardingNode,协助扩容后重试。
- 加锁桶位头节点,遍历链表或红黑树,找到匹配的节点。
- 若是链表,修改前一个节点的next指针,指向当前节点的next节点完成删除。
- 若是红黑树,删除对应节点,判断是否需要转回链表(红黑树节点数<=6时转回链表)。
- 释放锁,调用addCount方法更新元素计数,返回被删除的value。
整个删除过程,只有操作同一个桶位的线程才会竞争锁,不同桶位的删除操作完全并行,保证了高并发下的删除性能。
3.4 size方法:最终一致性的计数
size()方法有两个核心的使用误区,也是90%开发者都会踩的坑:
- size()方法返回的是最终一致性结果,而非强一致性结果,高并发场景下,返回结果可能与实际元素个数不一致,因为计数过程中其他线程可能正在插入或删除元素。
- size()方法在高并发下性能很差,它需要遍历所有CounterCell数组求和计算总数,不适合频繁调用。
ConcurrentHashMap的计数底层基于LongAdder的分段计数思想实现,核心是baseCount和CounterCell数组:
- 无竞争时,直接用CAS更新baseCount计数。
- 有竞争时,创建CounterCell数组,每个线程被哈希到不同的CounterCell单元格,更新对应单元格的数值,避免CAS竞争。
- 调用size()方法时,将baseCount和所有CounterCell单元格的数值求和,返回总数。
这个设计的核心是空间换时间,把竞争分散到不同单元格,极大提升了高并发下的计数性能,但代价是无法返回强一致性结果。
4. 多线程协助扩容:高并发下的性能黑科技
ConcurrentHashMap的扩容机制是其最亮眼的设计之一,采用多线程协助扩容机制,避免了单线程扩容的性能瓶颈,即使是大数组的扩容,也能在多线程协助下快速完成。
扩容的触发条件:
- 元素总数达到扩容阈值(数组长度 * 负载因子,默认0.75)。
- 链表长度>=8,但数组长度<64,会触发扩容。
扩容的核心流程:
- 新建一个容量为原数组2倍的新数组,数组长度始终保持2的幂次。
- 把原数组分成多个任务段,每个任务段默认16个桶位,每个线程认领一个任务段,负责该段内节点的迁移。
- 线程从后往前遍历负责的任务段,对每个桶位的节点进行迁移,迁移完成后,在原数组桶位放置ForwardingNode,标记该桶位已迁移完成。
- 其他线程执行put、remove等操作时,发现桶位是ForwardingNode,会协助认领任务段参与扩容,直到所有桶位迁移完成。
- 所有桶位迁移完成后,将table数组指向新数组,更新sizeCtl为新数组的扩容阈值,扩容完成。
这个设计的优势非常明显:多线程并行迁移,充分利用多核CPU性能,扩容速度与CPU核心数成正比;迁移过程中,已完成迁移的桶位可继续提供读写服务,只有正在迁移的桶位会被加锁,极大减少了扩容对业务的影响。
三、高并发容器全场景选型指南
除了ConcurrentHashMap,JDK还提供了一系列高并发容器,分别适用于不同的业务场景,选错容器不仅会导致性能下降,还可能出现线程安全问题。下面对常用高并发容器进行全维度对比,给出明确的选型标准。
1. 高并发Map容器选型
Map容器主要分为无序和有序两大类,核心选型对比如下:
| 容器名称 | 核心特性 | 时间复杂度 | 线程安全实现 | 最佳适用场景 |
| ConcurrentHashMap | 无序键值对,key/value非空 | 读写O(1) | 桶级synchronized+CAS,数组+链表+红黑树 | 绝大多数高并发键值对场景,如缓存、计数、去重、会话管理 |
| ConcurrentSkipListMap | 有序键值对,支持范围查询,key/value非空 | 读写O(logn) | 跳表+CAS无锁实现 | 高并发下需要有序、范围查询的场景,如排行榜、时间范围数据、有序缓存 |
选型边界非常清晰:
- 只需快速存取键值对,无需有序,优先选择ConcurrentHashMap,它的性能是所有Map容器中最高的。
- 需要对key排序,或需要执行范围查询,必须选择ConcurrentSkipListMap,ConcurrentHashMap无法支持高效的范围查询。
2. 高并发List容器选型
List容器的核心需求是有序、可重复、支持随机访问,高并发场景下的核心选型是CopyOnWriteArrayList,核心特性对比如下:
| 容器名称 | 核心特性 | 线程安全实现 | 读性能 | 写性能 | 最佳适用场景 |
| CopyOnWriteArrayList | 读无锁,写时复制,最终一致性,支持随机访问 | 写操作加ReentrantLock,读操作无锁,写时复制新数组 | 极高,与ArrayList持平 | 极低,每次写都要复制数组,内存开销大 | 读远多于写,写操作极少的场景,如配置白名单、静态数据列表、系统参数 |
| Vector | 全表加锁,强一致性 | 所有方法都用synchronized修饰,全表锁 | 低,所有读操作都要加锁竞争 | 低,所有写操作都要加锁竞争 | 已废弃,不推荐使用 |
| Collections.synchronizedList | 全表加锁,强一致性 | 包装ArrayList,所有方法都用synchronized修饰mutex对象 | 低,所有读操作都要加锁竞争 | 低,所有写操作都要加锁竞争 | 不推荐在高并发场景使用 |
CopyOnWriteArrayList的核心原理是写时复制(Copy-On-Write):
- 所有读操作无锁,直接读取当前数组,性能极高。
- 所有写操作(add、remove、set)都会加ReentrantLock锁,复制一个新的数组,在新数组上执行修改操作,修改完成后将数组引用指向新数组,最后释放锁。
- 迭代器是快照迭代器,创建时拿到当前数组的快照,迭代过程中不会受其他线程修改的影响,不会抛出ConcurrentModificationException,但也无法看到最新的修改。
3. 高并发队列容器选型
队列容器主要用于生产者消费者模型,是高并发开发中解耦、异步、削峰的核心组件,分为非阻塞队列和阻塞队列两大类。
3.1 非阻塞队列
非阻塞队列基于CAS无锁实现,不会阻塞线程,高并发下性能极高,核心代表是ConcurrentLinkedQueue。
ConcurrentLinkedQueue是无界非阻塞队列,基于单向链表实现,入队和出队操作都基于CAS无锁实现,无需加锁,核心特性:
- 无界队列,理论上可无限添加元素,不会阻塞生产者线程,使用时必须注意内存溢出问题。
- 入队和出队都是O(1)时间复杂度,高并发下吞吐量远超阻塞队列。
- size()方法通过遍历整个链表实现,性能极差,高并发下禁止调用。
- 不支持阻塞操作,队列为空时,出队操作直接返回null,不会阻塞消费者线程。
最佳适用场景:高并发下的无界生产消费模型,无需阻塞等待,生产者速度可控的场景,如异步任务队列、日志收集队列。
3.2 阻塞队列
阻塞队列实现了BlockingQueue接口,支持阻塞操作:队列满时,生产者线程会被阻塞,直到队列有空闲位置;队列为空时,消费者线程会被阻塞,直到队列有元素。是线程池、消息中间件的核心组件,核心选型对比如下:
| 容器名称 | 核心特性 | 线程安全实现 | 最佳适用场景 |
| ArrayBlockingQueue | 有界阻塞队列,固定容量,基于数组实现,入队出队共用一把锁 | ReentrantLock+Condition | 固定容量的生产者消费者模型,需要严格控制队列长度的场景,如线程池的核心工作队列 |
| LinkedBlockingQueue | 可选有界/无界阻塞队列,基于链表实现,入队出队双锁分离,吞吐量更高 | 入队和出队分别用独立的ReentrantLock+Condition | 吞吐量优先的生产者消费者模型,如日志异步处理、异步消息转发 |
| SynchronousQueue | 同步队列,容量为0,每个入队操作必须等待一个出队操作,反之亦然 | CAS+LockSupport | 直接传递的生产者消费者模型,如Executors.newCachedThreadPool()的默认工作队列 |
| PriorityBlockingQueue | 无界优先级阻塞队列,支持元素优先级排序 | ReentrantLock+CAS | 需要按优先级处理任务的场景,如优先级任务调度、VIP用户请求优先处理 |
| DelayQueue | 无界延迟阻塞队列,元素必须实现Delayed接口,只有延迟时间到了才能出队 | 基于PriorityBlockingQueue实现 | 延迟任务处理场景,如订单超时取消、缓存过期清理、定时任务调度 |
四、高并发容器90%开发者都会踩的坑
即使使用了ConcurrentHashMap等高并发容器,也不代表代码就是线程安全的,很多开发者因为对容器特性理解不到位,踩了很多线程安全的坑,这里总结了最常见的8个坑,每个坑都给出错误示例和正确解决方案。
1. 复合操作的线程安全坑
很多开发者以为用了ConcurrentHashMap,所有操作都是线程安全的,但手动组合的复合操作,比如先判断再操作,并不是原子操作,中间可能被其他线程修改,导致线程安全问题。
错误示例:
package com.jam.demo;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class ConcurrentHashMapDemo {
private final ConcurrentHashMap<String, String> concurrentHashMap = new ConcurrentHashMap<>();
/**
* 错误的复合操作,非线程安全
* @param key 键
* @param value 值
* @author ken
*/
public void wrongCompoundOperation(String key, String value) {
if (!concurrentHashMap.containsKey(key)) {
concurrentHashMap.put(key, value);
log.info("key:{} 插入成功", key);
}
}
}
正确解决方案:使用ConcurrentHashMap提供的原子复合操作API,比如putIfAbsent、computeIfAbsent、merge等,这些API底层保证了整个操作的原子性,不会被其他线程打断。
正确示例:
package com.jam.demo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class ConcurrentHashMapDemo {
private final ConcurrentHashMap<String, String> concurrentHashMap = new ConcurrentHashMap<>();
/**
* 正确的原子插入操作,线程安全
* @param key 键
* @param value 值
* @author ken
*/
public void correctPutIfAbsent(String key, String value) {
String oldValue = concurrentHashMap.putIfAbsent(key, value);
if (ObjectUtils.isEmpty(oldValue)) {
log.info("key:{} 插入成功", key);
} else {
log.info("key:{} 已存在,旧值:{}", key, oldValue);
}
}
/**
* 正确的原子查询插入操作,key不存在时才执行创建逻辑
* @param key 键
* @return 对应的值
* @author ken
*/
public String correctComputeIfAbsent(String key) {
return concurrentHashMap.computeIfAbsent(key, k -> {
return queryValueFromDb(k);
});
}
/**
* 模拟从数据库查询数据
* @param key 键
* @return 查询结果
* @author ken
*/
private String queryValueFromDb(String key) {
log.info("执行数据库查询,key:{}", key);
return "value:" + key;
}
}
2. key/value为null的空指针坑
ConcurrentHashMap明确禁止key和value为null,核心原因是并发场景下的歧义问题:当调用get(key)返回null时,无法判断是key不存在,还是key对应的value就是null。在单线程的HashMap中,可以用containsKey(key)验证,但在并发场景下,containsKey和get之间,key可能被其他线程删除,导致验证结果不可靠。
永远不要给ConcurrentHashMap的key和value赋值为null,如果需要表示空值,用自定义的空对象或者空字符串代替。
3. size()方法的误用坑
size()方法返回的是最终一致性结果,不是强一致性的,而且高并发下性能很差,很多开发者会在业务逻辑里频繁调用size()方法,甚至用它做条件判断,导致业务逻辑错误。
错误示例:
/**
* 错误的size()方法使用
* @author ken
*/
public void wrongSizeUsage() {
if (concurrentHashMap.size() > 1000) {
doLimit();
}
}
正确解决方案:
- 仅判断容器是否为空,用isEmpty()方法,性能远高于size()。
- 需要精确的强一致性计数,单独用LongAdder维护计数,与容器操作保持原子性。
- 永远不要在高并发场景下频繁调用size()方法,尤其是循环内调用。
4. 迭代器的弱一致性坑
ConcurrentHashMap的迭代器是弱一致性的,迭代过程中,其他线程对容器的修改,可能会反映到迭代器中,也可能不会,不会抛出ConcurrentModificationException。
正确的使用方式:
- 迭代操作仅适合对数据一致性要求不高的遍历场景,如数据统计、日志打印。
- 需要强一致性的遍历,必须手动加锁,或在业务低峰期执行。
- 永远不要在迭代器里执行元素的add/remove操作,避免数据不一致。
5. 初始化容量设置不当的扩容坑
很多开发者创建ConcurrentHashMap时,直接用无参构造函数,默认容量16,负载因子0.75,当元素个数达到12时就会触发扩容,插入100万条数据会触发十几次扩容,严重影响性能。
正确的初始化容量计算公式:初始容量 = (预期最大元素个数 / 负载因子) + 1负载因子默认0.75,比如预期最大元素个数是1000,初始容量=1000/0.75 +1 ≈ 1334,向上取2的幂次是2048,可保证元素个数达到1000时不会触发扩容。
正确示例:
/**
* 正确的ConcurrentHashMap初始化
* @author ken
*/
public void correctInit() {
int expectSize = 1000;
int initCapacity = (int) (expectSize / 0.75f) + 1;
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(initCapacity);
}
6. compute/merge回调函数的阻塞坑
很多开发者会在compute、computeIfAbsent、merge方法的回调函数里执行耗时操作,比如数据库查询、远程调用等,这是一个巨大的坑。回调函数执行期间,会持有当前桶位的synchronized锁,如果回调函数耗时很长,会导致其他操作同一个桶位的线程全部被阻塞,锁竞争急剧激烈,吞吐量急剧下降。
错误示例:
/**
* 错误的computeIfAbsent使用,回调函数里有耗时操作
* @param key 键
* @return 对应的值
* @author ken
*/
public String wrongComputeIfAbsent(String key) {
return concurrentHashMap.computeIfAbsent(key, k -> {
return remoteCallService.queryValue(k);
});
}
正确解决方案:回调函数里只执行简单的、无耗时的、非阻塞的操作,耗时操作提前执行,放到回调函数外面。
正确示例:
/**
* 正确的耗时操作处理,避免持有锁时执行耗时逻辑
* @param key 键
* @return 对应的值
* @author ken
*/
public String correctComputeWithTimeConsuming(String key) {
String value = concurrentHashMap.get(key);
if (ObjectUtils.isEmpty(value)) {
String newValue = remoteCallService.queryValue(key);
value = concurrentHashMap.putIfAbsent(key, newValue);
if (ObjectUtils.isEmpty(value)) {
value = newValue;
}
}
return value;
}
7. CopyOnWriteArrayList的写频繁坑
CopyOnWriteArrayList的每次写操作都要复制整个数组,数据量越大,写操作的性能越差,内存开销越大。频繁写会导致频繁的数组复制,内存占用飙升,频繁GC,严重影响系统稳定性。
正确的使用方式:CopyOnWriteArrayList只适合读远多于写,写操作极少的场景,比如配置白名单、静态数据列表;写频繁的场景,绝对不要使用CopyOnWriteArrayList。
8. 无界队列的内存溢出坑
很多开发者使用ConcurrentLinkedQueue、无界模式的LinkedBlockingQueue时,不控制队列长度,当生产者速度远大于消费者速度时,队列元素会无限堆积,最终导致内存溢出,系统崩溃。
正确的使用方式:生产环境必须使用有界队列,设置合理的最大容量,防止内存溢出;必须监控队列长度,当队列长度达到阈值时,触发限流、告警,防止队列无限堆积。
五、生产环境高并发容器最佳实践
1. 合理设置初始容量,提前规避扩容风险
集合初始化时,必须根据预期的最大元素个数,设置合理的初始容量,避免运行期间频繁扩容,影响性能。对于队列容器,必须设置合理的最大容量,使用有界队列,防止内存溢出。
2. 优先使用官方原子操作API,避免手动实现复合操作
ConcurrentHashMap等高并发容器提供的原子复合操作API,底层已经保证了线程安全,性能经过了极致优化,永远不要自己手动组合多个操作实现复合逻辑,避免出现线程安全问题。
3. 读写分离,选择合适的容器
根据读写比例选择合适的容器:
- 读多写少的Map场景,选ConcurrentHashMap,get方法无锁,性能极高。
- 读多写少的List场景,选CopyOnWriteArrayList,读无锁,写时复制。
- 写多读少的场景,优先选择桶级锁的容器,避免全表锁和写时复制的容器。
4. 避免在锁持有期间执行耗时操作
不管是ConcurrentHashMap的compute回调函数,还是CopyOnWriteArrayList的写操作,都不要在持有锁期间执行耗时操作、远程调用、阻塞操作,避免锁竞争加剧,吞吐量下降。
5. 关注key的hash分布,避免hash倾斜
ConcurrentHashMap的性能依赖于key的hashCode分布均匀,如果hashCode分布不均匀,会导致大量key集中在同一个桶位,锁竞争激烈,性能急剧下降。必须重写好key的hashCode和equals方法,保证hashCode分布均匀。
6. 优先使用JDK原生并发容器,避免重复造轮子
JDK提供的并发容器经过了十几年的生产环境验证,性能和稳定性都是顶级的,永远不要自己实现线程安全的容器,避免出现线程安全问题和性能问题。
7. 监控与调优,提前发现风险
生产环境必须对高并发容器进行监控,核心监控指标包括:Map容器的元素个数、扩容次数、桶位碰撞率;队列容器的队列长度、入队出队TPS、堆积元素个数。根据监控数据,及时调整容器参数,提前发现性能瓶颈和风险。
核心总结
高并发容器是高并发开发的基础,也是面试的核心考点。很多开发者只停留在“会用”的层面,没有理解底层的原理和特性,导致踩了很多线程安全的坑,系统性能上不去,甚至出现生产事故。
本文从底层原理、核心API、全场景选型、生产避坑、最佳实践五个维度,彻底讲透了以ConcurrentHashMap为核心的高并发容器体系,核心选型心法:
- 高并发无序键值对场景,首选ConcurrentHashMap,它的桶级锁+无锁读设计,兼顾了线程安全和极致性能。
- 高并发有序键值对、范围查询场景,选ConcurrentSkipListMap。
- 读远多于写的List场景,选CopyOnWriteArrayList,写频繁场景绝对不要用。
- 高并发生产消费模型,优先使用有界阻塞队列,避免无界队列导致的内存溢出。
- 永远使用官方提供的原子复合操作API,不要手动组合操作,避免线程安全问题。
高并发开发的核心,不是用了多少高大上的技术,而是对每一个技术点的底层原理有清晰的认知,知道它的优势、劣势、适用场景和避坑点,只有这样,才能写出稳定、高性能的高并发代码。
附录:项目依赖与完整示例代码
pom.xml 核心依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.jam</groupId>
<artifactId>concurrent-demo</artifactId>
<version>1.0.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring.version>6.1.10</spring.version>
<lombok.version>1.18.34</lombok.version>
<guava.version>33.2.1-jre</guava.version>
<fastjson2.version>2.0.53</fastjson2.version>
<mybatis-plus.version>3.5.7</mybatis-plus.version>
<springdoc.version>2.5.0</springdoc.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>${springdoc.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>3.3.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
高并发性能对比测试示例
package com.jam.demo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class ConcurrentMapPerformanceTest {
private static final int THREAD_COUNT = 100;
private static final int OPERATE_COUNT_PER_THREAD = 10000;
/**
* 并发写性能对比测试
* @author ken
*/
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
Hashtable<String, Integer> hashtable = new Hashtable<>();
StopWatch hashtableStopWatch = new StopWatch();
hashtableStopWatch.start();
CountDownLatch hashtableLatch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadIndex = i;
executorService.submit(() -> {
try {
for (int j = 0; j < OPERATE_COUNT_PER_THREAD; j++) {
String key = "key:" + threadIndex + ":" + j;
hashtable.put(key, j);
}
} finally {
hashtableLatch.countDown();
}
});
}
hashtableLatch.await();
hashtableStopWatch.stop();
log.info("Hashtable 并发写耗时:{}ms", hashtableStopWatch.getTotalTimeMillis());
Map<String, Integer> synchronizedMap = Collections.synchronizedMap(new HashMap<>());
StopWatch synchronizedMapStopWatch = new StopWatch();
synchronizedMapStopWatch.start();
CountDownLatch synchronizedMapLatch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadIndex = i;
executorService.submit(() -> {
try {
for (int j = 0; j < OPERATE_COUNT_PER_THREAD; j++) {
String key = "key:" + threadIndex + ":" + j;
synchronizedMap.put(key, j);
}
} finally {
synchronizedMapLatch.countDown();
}
});
}
synchronizedMapLatch.await();
synchronizedMapStopWatch.stop();
log.info("Collections.synchronizedMap 并发写耗时:{}ms", synchronizedMapStopWatch.getTotalTimeMillis());
ConcurrentHashMap<String, Integer> concurrentHashMap = new ConcurrentHashMap<>();
StopWatch concurrentHashMapStopWatch = new StopWatch();
concurrentHashMapStopWatch.start();
CountDownLatch concurrentHashMapLatch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadIndex = i;
executorService.submit(() -> {
try {
for (int j = 0; j < OPERATE_COUNT_PER_THREAD; j++) {
String key = "key:" + threadIndex + ":" + j;
concurrentHashMap.put(key, j);
}
} finally {
concurrentHashMapLatch.countDown();
}
});
}
concurrentHashMapLatch.await();
concurrentHashMapStopWatch.stop();
log.info("ConcurrentHashMap 并发写耗时:{}ms", concurrentHashMapStopWatch.getTotalTimeMillis());
executorService.shutdown();
}
}
Swagger3 接口示例
package com.jam.demo.controller;
import com.jam.demo.service.ConcurrentMapService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/concurrent")
@RequiredArgsConstructor
@Tag(name = "高并发容器测试接口", description = "ConcurrentHashMap等高并发容器的测试接口")
public class ConcurrentMapController {
private final ConcurrentMapService concurrentMapService;
@PostMapping("/put")
@Operation(summary = "插入键值对", description = "原子插入键值对,key不存在时才插入")
public void put(
@Parameter(description = "键", required = true) @RequestParam String key,
@Parameter(description = "值", required = true) @RequestParam String value
) {
concurrentMapService.correctPutIfAbsent(key, value);
}
@GetMapping("/get")
@Operation(summary = "查询键值对", description = "根据key查询对应的value,key不存在时从数据库查询并缓存")
public String get(
@Parameter(description = "键", required = true) @RequestParam String key
) {
return concurrentMapService.correctComputeIfAbsent(key);
}
@DeleteMapping("/remove")
@Operation(summary = "删除键值对", description = "根据key删除对应的键值对")
public void remove(
@Parameter(description = "键", required = true) @RequestParam String key
) {
concurrentMapService.remove(key);
}
}
业务服务层示例
package com.jam.demo.service;
import com.jam.demo.mapper.UserMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.concurrent.ConcurrentHashMap;
@Service
@Slf4j
@RequiredArgsConstructor
public class ConcurrentMapService {
private final ConcurrentHashMap<String, String> concurrentHashMap = new ConcurrentHashMap<>(2048);
private final UserMapper userMapper;
/**
* 原子插入键值对,key不存在时才插入
* @param key 键
* @param value 值
* @author ken
*/
public void correctPutIfAbsent(String key, String value) {
String oldValue = concurrentHashMap.putIfAbsent(key, value);
if (ObjectUtils.isEmpty(oldValue)) {
log.info("key:{} 插入成功", key);
} else {
log.info("key:{} 已存在,旧值:{}", key, oldValue);
}
}
/**
* 原子查询键值对,key不存在时从数据库查询并缓存
* @param key 键
* @return 对应的值
* @author ken
*/
public String correctComputeIfAbsent(String key) {
return concurrentHashMap.computeIfAbsent(key, k -> {
log.info("key:{} 不存在,从数据库查询", k);
return userMapper.queryValueByKey(k);
});
}
/**
* 删除键值对
* @param key 键
* @return 被删除的值
* @author ken
*/
public String remove(String key) {
String oldValue = concurrentHashMap.remove(key);
log.info("key:{} 删除完成,旧值:{}", key, oldValue);
return oldValue;
}
}
数据访问层示例
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.User;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
public interface UserMapper extends BaseMapper<User> {
/**
* 根据key查询value
* @param key 键
* @return 值
* @author ken
*/
@Select("select value from t_config where `key` = #{key} limit 1")
String queryValueByKey(@Param("key") String key);
}