引言
在高并发业务场景中,系统的吞吐量、稳定性与可维护性,核心取决于异步解耦与故障隔离能力的落地。绝大多数线上高并发故障,本质都源于两个核心问题:一是同步调用导致的链路阻塞与响应超时,二是共用资源池引发的级联雪崩。生产者-消费者范式是异步解耦的核心骨架,线程池隔离则是故障隔离的黄金法则,二者结合,才能构建出能扛住百万级QPS、同时保证业务连续性的高并发系统。本文从底层原理到生产级代码实现,完整拆解这两个核心范式的落地逻辑,帮你彻底打通高并发编程的任督二脉。
一、生产者-消费者范式:从底层原理到生产级实现
1.1 范式的核心本质与业务价值
生产者-消费者是一种基于异步解耦的并发设计范式,核心是通过一个线程安全的中间缓冲区(队列),完全分离数据生产逻辑与数据消费逻辑。生产者仅需将数据写入缓冲区,无需关心消费逻辑的实现、执行时机与扩容策略;消费者仅需从缓冲区拉取数据处理,无需关心数据的生产来源与触发逻辑。
其核心业务价值集中在三点:
- 解耦:生产者与消费者无直接代码依赖,仅依赖缓冲区契约,双方可独立迭代、扩容与故障隔离,单一方异常不会直接导致全链路阻塞。
- 异步化:同步转异步,大幅缩短核心接口响应时长,将非核心逻辑从主链路剥离,提升用户体验。
- 削峰填谷:流量峰值期,生产者将请求写入缓冲区,消费者以固定处理能力匀速消费,避免峰值流量直接打垮下游数据库与第三方服务,是秒杀、大促场景的核心兜底能力。
1.2 底层线程通信机制全解析
生产者与消费者运行在不同线程中,必须解决线程间的同步与通信问题,避免出现数据丢失、重复消费、死锁与虚假唤醒等问题。Java中主流的线程通信机制分为三层,从底层原生实现到生产级封装逐层递进。
1.2.1 基于synchronized + wait/notify的原生实现
这是Java线程通信的底层基础,基于对象的监视器锁(Monitor)实现,JDK17中对synchronized做了全链路优化,从偏向锁、轻量级锁到重量级锁的自适应升级,性能已与显式Lock无显著差距。
核心规范(JDK官方强制要求):
- wait()、notify()、notifyAll()必须在synchronized同步块内执行,否则会抛出IllegalMonitorStateException
- wait()必须放在while循环中判断条件,禁止使用if判断,避免虚假唤醒导致的逻辑异常
- 优先使用notifyAll()而非notify(),避免信号丢失导致的线程永久阻塞
package com.jam.demo.base;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import java.util.Queue;
@Slf4j
public class WaitNotifyModel {
private final Queue<Object> buffer;
private final int capacity;
public WaitNotifyModel(int capacity) {
this.buffer = new LinkedList<>();
this.capacity = capacity;
}
/**
* 生产者生产数据
* @param data 待生产的数据
* @throws InterruptedException 线程中断异常
*/
public synchronized void produce(Object data) throws InterruptedException {
while (buffer.size() == capacity) {
this.wait();
}
buffer.offer(data);
log.info("生产者生产数据,当前缓冲区大小:{}", buffer.size());
this.notifyAll();
}
/**
* 消费者消费数据
* @return 消费的数据
* @throws InterruptedException 线程中断异常
*/
public synchronized Object consume() throws InterruptedException {
while (buffer.isEmpty()) {
this.wait();
}
Object data = buffer.poll();
log.info("消费者消费数据,当前缓冲区大小:{}", buffer.size());
this.notifyAll();
return data;
}
}
1.2.2 基于Lock + Condition的精准唤醒实现
显式Lock与Condition组合,解决了synchronized只能有一个等待队列的缺陷,支持多条件队列实现精准唤醒,减少无效的线程上下文切换,底层基于JUC的AQS(抽象队列同步器)实现。
核心优势:可分别创建生产者等待队列(notFull)与消费者等待队列(notEmpty),缓冲区满时仅阻塞生产者,缓冲区空时仅阻塞消费者,唤醒时仅唤醒对应队列的线程,无需全量唤醒。
package com.jam.demo.base;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class LockConditionModel {
private final Queue<Object> buffer;
private final int capacity;
private final Lock lock;
private final Condition notFull;
private final Condition notEmpty;
public LockConditionModel(int capacity) {
this.buffer = new LinkedList<>();
this.capacity = capacity;
this.lock = new ReentrantLock();
this.notFull = lock.newCondition();
this.notEmpty = lock.newCondition();
}
/**
* 生产者生产数据
* @param data 待生产的数据
* @throws InterruptedException 线程中断异常
*/
public void produce(Object data) throws InterruptedException {
lock.lockInterruptibly();
try {
while (buffer.size() == capacity) {
notFull.await();
}
buffer.offer(data);
log.info("生产者生产数据,当前缓冲区大小:{}", buffer.size());
notEmpty.signal();
} finally {
lock.unlock();
}
}
/**
* 消费者消费数据
* @return 消费的数据
* @throws InterruptedException 线程中断异常
*/
public Object consume() throws InterruptedException {
lock.lockInterruptibly();
try {
while (buffer.isEmpty()) {
notEmpty.await();
}
Object data = buffer.poll();
log.info("消费者消费数据,当前缓冲区大小:{}", buffer.size());
notFull.signal();
return data;
} finally {
lock.unlock();
}
}
}
1.2.3 基于JUC BlockingQueue的生产级封装
生产环境99%的场景,无需手动实现线程同步逻辑,直接使用JUC提供的BlockingQueue阻塞队列即可,其内部已完整实现了线程同步、等待唤醒与内存可见性保证,是生产者-消费者范式的标准实现载体。
核心特性:
- 缓冲区满时,生产者线程自动阻塞,直到缓冲区出现空闲位置
- 缓冲区空时,消费者线程自动阻塞,直到缓冲区写入新数据
- 所有操作均为线程安全,无并发安全问题
阻塞队列选型指南(生产级)
| 队列实现类 | 底层结构 | 有界/无界 | 核心特性 | 生产适用场景 |
| ArrayBlockingQueue | 定长数组 | 有界 | 单锁实现,读写不分离,支持公平锁,内存占用固定 | 容量固定的常规业务场景,严格控制内存占用 |
| LinkedBlockingQueue | 链表 | 可选有界/无界 | 双锁实现,读写分离,吞吐量更高,默认无界(高危) | 高吞吐量场景,必须强制指定容量 |
| SynchronousQueue | 无缓冲 | 有界(容量0) | 不存储元素,生产必须等待消费,一对一传递 | 短平快的无堆积任务场景,CachedThreadPool默认队列 |
| DelayQueue | 优先级堆 | 无界 | 延迟消费,元素必须实现Delayed接口 | 超时订单关闭、定时任务重试、缓存过期清理 |
| PriorityBlockingQueue | 二叉堆 | 无界 | 按优先级排序,支持自定义比较器 | VIP订单优先处理、分级任务调度场景 |
核心API选型规范
BlockingQueue提供了四类操作API,生产环境需严格按场景选型,禁止乱用:
| 操作类型 | 抛出异常 | 返回特殊值 | 无限阻塞 | 超时等待 |
| 数据插入 | add(e) | offer(e) | put(e) | offer(e, timeout, unit) |
| 数据移除 | remove() | poll() | take() | poll(timeout, unit) |
| 数据查看 | element() | peek() | 不支持 | 不支持 |
生产环境强制规范:
- 插入操作优先使用
offer(e, timeout, unit),禁止无限阻塞的put(e),设置超时时间实现降级兜底,避免生产者线程永久阻塞 - 移除操作优先使用
poll(timeout, unit),禁止无限阻塞的take(),方便优雅停机与线程生命周期管理 - 绝对禁止使用无界队列,除非能100%保证生产速度永远不超过消费速度,否则必然导致OOM
1.3 生产级生产者-消费者完整实现
以电商订单异步事件处理为场景,实现带优雅停机、异常重试、监控告警的生产级代码,基于JDK17编写,完整可运行。
1.3.1 项目基础依赖(pom.xml)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.4.0</version>
<relativePath/>
</parent>
<groupId>com.jam</groupId>
<artifactId>high-concurrency-demo</artifactId>
<version>1.0.0</version>
<name>high-concurrency-demo</name>
<properties>
<java.version>17</java.version>
<guava.version>33.2.0-jre</guava.version>
<fastjson2.version>2.0.53</fastjson2.version>
<mybatis-plus.version>3.5.7</mybatis-plus.version>
<springdoc.version>2.5.0</springdoc.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>${springdoc.version}</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.34</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
1.3.2 事件载体定义
package com.jam.demo.model;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serial;
import java.io.Serializable;
import java.time.LocalDateTime;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "订单事件实体")
public class OrderEvent implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
@Schema(description = "事件ID")
private String eventId;
@Schema(description = "订单号")
private String orderNo;
@Schema(description = "用户ID")
private Long userId;
@Schema(description = "事件类型")
private String eventType;
@Schema(description = "事件内容")
private String content;
@Schema(description = "重试次数")
private Integer retryTimes;
@Schema(description = "最大重试次数")
private Integer maxRetryTimes;
@Schema(description = "创建时间")
private LocalDateTime createTime;
}
1.3.3 事件生产者实现
package com.jam.demo.producer;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.model.OrderEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class OrderEventProducer {
private static final int QUEUE_CAPACITY = 10000;
private static final long OFFER_TIMEOUT = 3L;
private final LinkedBlockingQueue<OrderEvent> eventQueue;
public OrderEventProducer() {
this.eventQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
}
/**
* 投递订单事件到缓冲区
* @param event 订单事件
* @return 投递结果
*/
public boolean sendEvent(OrderEvent event) {
if (org.springframework.util.ObjectUtils.isEmpty(event)) {
log.warn("订单事件为空,投递失败");
return false;
}
try {
boolean result = eventQueue.offer(event, OFFER_TIMEOUT, TimeUnit.SECONDS);
if (result) {
log.info("订单事件投递成功,eventId:{},队列剩余容量:{}", event.getEventId(), eventQueue.remainingCapacity());
} else {
log.error("订单事件投递超时,事件内容:{}", JSON.toJSONString(event));
}
return result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("订单事件投递被中断,eventId:{}", event.getEventId(), e);
return false;
}
}
/**
* 从缓冲区拉取事件(供消费者调用)
* @param timeout 超时时间
* @param unit 时间单位
* @return 订单事件
* @throws InterruptedException 线程中断异常
*/
public OrderEvent pollEvent(long timeout, TimeUnit unit) throws InterruptedException {
return eventQueue.poll(timeout, unit);
}
/**
* 获取当前队列积压数量
* @return 积压事件数
*/
public int getQueueSize() {
return eventQueue.size();
}
/**
* 获取队列剩余容量
* @return 剩余容量
*/
public int getRemainingCapacity() {
return eventQueue.remainingCapacity();
}
}
1.3.4 事件消费者实现
package com.jam.demo.consumer;
import com.jam.demo.model.OrderEvent;
import com.jam.demo.producer.OrderEventProducer;
import com.jam.demo.manager.ThreadPoolManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Component
public class OrderEventConsumer implements ApplicationRunner, DisposableBean {
private static final int CONSUMER_THREAD_NUM = 4;
private static final long POLL_TIMEOUT = 5L;
private final OrderEventProducer orderEventProducer;
private final ThreadPoolManager threadPoolManager;
private final AtomicBoolean isRunning = new AtomicBoolean(true);
public OrderEventConsumer(OrderEventProducer orderEventProducer, ThreadPoolManager threadPoolManager) {
this.orderEventProducer = orderEventProducer;
this.threadPoolManager = threadPoolManager;
}
@Override
public void run(ApplicationArguments args) {
for (int i = 0; i < CONSUMER_THREAD_NUM; i++) {
Thread consumerThread = new Thread(this::doConsume, "order-event-consumer-" + i);
consumerThread.setDaemon(false);
consumerThread.start();
}
log.info("订单事件消费者启动完成,消费线程数:{}", CONSUMER_THREAD_NUM);
}
/**
* 核心消费逻辑
*/
private void doConsume() {
while (isRunning.get() || orderEventProducer.getQueueSize() > 0) {
try {
OrderEvent event = orderEventProducer.pollEvent(POLL_TIMEOUT, TimeUnit.SECONDS);
if (org.springframework.util.ObjectUtils.isEmpty(event)) {
continue;
}
dispatchEvent(event);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("消费线程被中断,线程名:{}", Thread.currentThread().getName(), e);
break;
} catch (Exception e) {
log.error("消费事件发生未知异常", e);
}
}
log.info("消费线程退出,线程名:{}", Thread.currentThread().getName());
}
/**
* 事件分发到对应业务线程池
* @param event 订单事件
*/
private void dispatchEvent(OrderEvent event) {
switch (event.getEventType()) {
case "SMS_NOTIFY" -> threadPoolManager.getSmsNotifyExecutor().execute(() -> handleSmsNotify(event));
case "LOGISTICS_SYNC" -> threadPoolManager.getLogisticsSyncExecutor().execute(() -> handleLogisticsSync(event));
case "POINT_GRANT" -> threadPoolManager.getPointGrantExecutor().execute(() -> handlePointGrant(event));
case "USER_PROFILE_UPDATE" -> threadPoolManager.getUserProfileExecutor().execute(() -> handleUserProfileUpdate(event));
default -> log.warn("未知事件类型,event:{}", event);
}
}
private void handleSmsNotify(OrderEvent event) {
log.info("处理短信通知事件,orderNo:{}", event.getOrderNo());
}
private void handleLogisticsSync(OrderEvent event) {
log.info("处理物流同步事件,orderNo:{}", event.getOrderNo());
}
private void handlePointGrant(OrderEvent event) {
log.info("处理积分赠送事件,orderNo:{}", event.getOrderNo());
}
private void handleUserProfileUpdate(OrderEvent event) {
log.info("处理用户画像更新事件,orderNo:{}", event.getOrderNo());
}
@Override
public void destroy() {
isRunning.set(false);
log.info("消费者开始优雅停机,剩余待处理事件数:{}", orderEventProducer.getQueueSize());
}
}
二、线程池隔离:高并发系统的舱壁模式落地
2.1 为什么必须做线程池隔离?
绝大多数高并发雪崩故障,都源于共用资源池的级联故障。在传统单线程池架构中,所有业务任务共用同一个线程池,一旦某一个慢任务(如第三方接口超时、慢SQL)占满了线程池,会导致所有业务任务都无法执行,最终整个服务不可用。
举个典型的电商场景:订单服务包含四个核心操作,下单扣库存、发送短信、同步物流、更新用户画像。如果物流第三方接口超时,导致线程池内的所有线程都阻塞在该接口调用上,那么核心的下单扣库存任务也无法获取线程执行,整个订单服务瘫痪,这就是典型的线程池雪崩。
线程池隔离的核心思想来自舱壁模式(Bulkhead Pattern),源于船舶设计:船舶的船舱被隔离为多个独立舱室,单个舱室进水不会导致整个船舶沉没。对应到系统中,就是将不同类型的业务任务,拆分到独立的线程池中执行,单个线程池因故障占满,不会影响其他线程池的任务执行,实现故障隔离,避免级联雪崩。
2.2 线程池核心原理与执行流程
线程池隔离的前提,是彻底理解线程池的核心参数与执行逻辑,否则隔离只会流于形式,无法真正实现故障隔离。
2.2.1 线程池核心参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:核心线程数,线程池常驻的线程数量,默认不会被回收
- maximumPoolSize:最大线程数,线程池能容纳的最大线程数量
- keepAliveTime:非核心线程的空闲存活时间,超过该时间未执行任务的非核心线程会被回收
- unit:存活时间的单位
- workQueue:工作队列,存放等待执行的任务
- threadFactory:线程工厂,用于创建线程,生产环境必须自定义,设置线程名称与异常处理器,方便问题排查
- handler:拒绝策略,当线程池与工作队列都满时,处理新任务的策略
2.2.2 线程池执行流程
2.2.3 拒绝策略选型指南
JDK提供了4种默认拒绝策略,生产环境需按业务场景严格选型:
- AbortPolicy:默认策略,直接抛出RejectedExecutionException异常,适合核心业务,及时感知故障
- CallerRunsPolicy:由调用线程执行任务,实现自动降级,避免任务丢失,适合核心写业务
- DiscardPolicy:直接丢弃任务,不抛出异常,适合非核心、可丢失的业务
- DiscardOldestPolicy:丢弃队列中最旧的任务,重试提交当前任务,适合优先级均匀的场景
2.3 生产级线程池隔离完整实现
2.3.1 自定义线程工厂
package com.jam.demo.factory;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class CustomThreadFactory implements ThreadFactory {
private final String threadNamePrefix;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final boolean isDaemon;
public CustomThreadFactory(String threadNamePrefix, boolean isDaemon) {
this.threadNamePrefix = threadNamePrefix;
this.isDaemon = isDaemon;
}
@Override
public Thread newThread(Runnable r) {
String threadName = threadNamePrefix + "-" + threadNumber.getAndIncrement();
Thread thread = new Thread(r, threadName);
thread.setDaemon(isDaemon);
thread.setUncaughtExceptionHandler((t, e) -> log.error("线程{}发生未捕获异常", t.getName(), e));
return thread;
}
}
2.3.2 自定义拒绝策略
package com.jam.demo.handler;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
private final String threadPoolName;
public CustomRejectedExecutionHandler(String threadPoolName) {
this.threadPoolName = threadPoolName;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.error("线程池[{}]触发拒绝策略,核心线程数:{},最大线程数:{},活跃线程数:{},队列积压数:{}",
threadPoolName,
executor.getCorePoolSize(),
executor.getMaximumPoolSize(),
executor.getActiveCount(),
executor.getQueue().size());
if (!executor.isShutdown()) {
r.run();
}
}
}
2.3.3 线程池管理器
package com.jam.demo.manager;
import com.jam.demo.factory.CustomThreadFactory;
import com.jam.demo.handler.CustomRejectedExecutionHandler;
import lombok.Getter;
import org.springframework.stereotype.Component;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Getter
@Component
public class ThreadPoolManager {
private static final int CPU_CORE_NUM = Runtime.getRuntime().availableProcessors();
private final ThreadPoolExecutor orderCoreExecutor;
private final ThreadPoolExecutor smsNotifyExecutor;
private final ThreadPoolExecutor logisticsSyncExecutor;
private final ThreadPoolExecutor pointGrantExecutor;
private final ThreadPoolExecutor userProfileExecutor;
public ThreadPoolManager() {
this.orderCoreExecutor = new ThreadPoolExecutor(
CPU_CORE_NUM,
CPU_CORE_NUM * 2,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new CustomThreadFactory("order-core-pool", false),
new CustomRejectedExecutionHandler("order-core-pool")
);
this.smsNotifyExecutor = new ThreadPoolExecutor(
CPU_CORE_NUM * 10,
CPU_CORE_NUM * 20,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5000),
new CustomThreadFactory("sms-notify-pool", false),
new CustomRejectedExecutionHandler("sms-notify-pool")
);
this.logisticsSyncExecutor = new ThreadPoolExecutor(
CPU_CORE_NUM * 5,
CPU_CORE_NUM * 10,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2000),
new CustomThreadFactory("logistics-sync-pool", false),
new CustomRejectedExecutionHandler("logistics-sync-pool")
);
this.pointGrantExecutor = new ThreadPoolExecutor(
CPU_CORE_NUM * 8,
CPU_CORE_NUM * 15,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3000),
new CustomThreadFactory("point-grant-pool", false),
new CustomRejectedExecutionHandler("point-grant-pool")
);
this.userProfileExecutor = new ThreadPoolExecutor(
CPU_CORE_NUM * 2,
CPU_CORE_NUM * 4,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new CustomThreadFactory("user-profile-pool", false),
new CustomRejectedExecutionHandler("user-profile-pool")
);
}
}
2.3.4 线程池监控实现
package com.jam.demo.monitor;
import com.jam.demo.manager.ThreadPoolManager;
import com.jam.demo.producer.OrderEventProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
@Component
public class ThreadPoolMonitor {
private final ThreadPoolManager threadPoolManager;
private final OrderEventProducer orderEventProducer;
public ThreadPoolMonitor(ThreadPoolManager threadPoolManager, OrderEventProducer orderEventProducer) {
this.threadPoolManager = threadPoolManager;
this.orderEventProducer = orderEventProducer;
}
@Scheduled(fixedRate = 60000)
public void printThreadPoolMetrics() {
log.info("====================线程池监控指标====================");
printExecutorMetrics("order-core-pool", threadPoolManager.getOrderCoreExecutor());
printExecutorMetrics("sms-notify-pool", threadPoolManager.getSmsNotifyExecutor());
printExecutorMetrics("logistics-sync-pool", threadPoolManager.getLogisticsSyncExecutor());
printExecutorMetrics("point-grant-pool", threadPoolManager.getPointGrantExecutor());
printExecutorMetrics("user-profile-pool", threadPoolManager.getUserProfileExecutor());
log.info("事件队列积压数:{},剩余容量:{}", orderEventProducer.getQueueSize(), orderEventProducer.getRemainingCapacity());
log.info("======================================================");
}
private void printExecutorMetrics(String poolName, ThreadPoolExecutor executor) {
log.info("[{}] 核心线程数:{},最大线程数:{},活跃线程数:{},完成任务数:{},队列积压数:{},拒绝任务数:{}",
poolName,
executor.getCorePoolSize(),
executor.getMaximumPoolSize(),
executor.getActiveCount(),
executor.getCompletedTaskCount(),
executor.getQueue().size(),
executor.getRejectedExecutionHandler().getClass().getSimpleName());
}
}
三、完整业务落地:订单系统的高并发实现
3.1 业务架构设计
3.2 数据库表设计(MySQL 8.0)
CREATE TABLE `t_order` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`order_no` varchar(64) NOT NULL COMMENT '订单号',
`user_id` bigint NOT NULL COMMENT '用户ID',
`sku_id` bigint NOT NULL COMMENT '商品SKU ID',
`buy_num` int NOT NULL COMMENT '购买数量',
`order_amount` decimal(12,2) NOT NULL COMMENT '订单金额',
`order_status` tinyint NOT NULL DEFAULT '0' COMMENT '订单状态 0-待支付 1-已支付 2-已发货 3-已完成 4-已取消',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_order_no` (`order_no`),
KEY `idx_user_id` (`user_id`),
KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='订单表';
CREATE TABLE `t_sku_stock` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`sku_id` bigint NOT NULL COMMENT '商品SKU ID',
`stock_num` int NOT NULL DEFAULT '0' COMMENT '库存数量',
`lock_stock_num` int NOT NULL DEFAULT '0' COMMENT '锁定库存数量',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_sku_id` (`sku_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='商品库存表';
3.3 实体类定义
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDateTime;
@Data
@TableName("t_order")
@Schema(description = "订单实体")
public class Order implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
@TableId(type = IdType.AUTO)
@Schema(description = "主键ID")
private Long id;
@Schema(description = "订单号")
private String orderNo;
@Schema(description = "用户ID")
private Long userId;
@Schema(description = "商品SKU ID")
private Long skuId;
@Schema(description = "购买数量")
private Integer buyNum;
@Schema(description = "订单金额")
private BigDecimal orderAmount;
@Schema(description = "订单状态")
private Integer orderStatus;
@Schema(description = "创建时间")
private LocalDateTime createTime;
@Schema(description = "更新时间")
private LocalDateTime updateTime;
}
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.time.LocalDateTime;
@Data
@TableName("t_sku_stock")
@Schema(description = "库存实体")
public class SkuStock implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
@TableId(type = IdType.AUTO)
@Schema(description = "主键ID")
private Long id;
@Schema(description = "商品SKU ID")
private Long skuId;
@Schema(description = "库存数量")
private Integer stockNum;
@Schema(description = "锁定库存数量")
private Integer lockStockNum;
@Schema(description = "创建时间")
private LocalDateTime createTime;
@Schema(description = "更新时间")
private LocalDateTime updateTime;
}
3.4 Mapper接口定义
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.Order;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;
public interface OrderMapper extends BaseMapper<Order> {
}
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.SkuStock;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;
public interface SkuStockMapper extends BaseMapper<SkuStock> {
@Update("UPDATE t_sku_stock SET stock_num = stock_num - #{buyNum}, lock_stock_num = lock_stock_num + #{buyNum} WHERE sku_id = #{skuId} AND stock_num >= #{buyNum}")
int deductStock(@Param("skuId") Long skuId, @Param("buyNum") Integer buyNum);
}
3.5 核心订单服务实现
package com.jam.demo.service;
import com.jam.demo.entity.Order;
import com.jam.demo.manager.ThreadPoolManager;
import com.jam.demo.mapper.OrderMapper;
import com.jam.demo.mapper.SkuStockMapper;
import com.jam.demo.model.OrderEvent;
import com.jam.demo.producer.OrderEventProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.LocalDateTime;
import java.util.UUID;
@Slf4j
@Service
public class OrderService {
private final OrderMapper orderMapper;
private final SkuStockMapper skuStockMapper;
private final TransactionTemplate transactionTemplate;
private final OrderEventProducer orderEventProducer;
private final ThreadPoolManager threadPoolManager;
public OrderService(OrderMapper orderMapper, SkuStockMapper skuStockMapper, TransactionTemplate transactionTemplate, OrderEventProducer orderEventProducer, ThreadPoolManager threadPoolManager) {
this.orderMapper = orderMapper;
this.skuStockMapper = skuStockMapper;
this.transactionTemplate = transactionTemplate;
this.orderEventProducer = orderEventProducer;
this.threadPoolManager = threadPoolManager;
}
/**
* 创建订单
* @param userId 用户ID
* @param skuId 商品SKU ID
* @param buyNum 购买数量
* @return 订单号
*/
public String createOrder(Long userId, Long skuId, Integer buyNum) {
if (org.springframework.util.ObjectUtils.isEmpty(userId) || org.springframework.util.ObjectUtils.isEmpty(skuId) || org.springframework.util.ObjectUtils.isEmpty(buyNum) || buyNum <= 0) {
throw new IllegalArgumentException("订单参数异常");
}
String orderNo = UUID.randomUUID().toString().replace("-", "");
Boolean result = transactionTemplate.execute(status -> {
try {
int deductResult = skuStockMapper.deductStock(skuId, buyNum);
if (deductResult <= 0) {
log.warn("库存扣减失败,skuId:{}, buyNum:{}", skuId, buyNum);
status.setRollbackOnly();
return false;
}
Order order = new Order();
order.setOrderNo(orderNo);
order.setUserId(userId);
order.setSkuId(skuId);
order.setBuyNum(buyNum);
order.setOrderAmount(java.math.BigDecimal.valueOf(100).multiply(java.math.BigDecimal.valueOf(buyNum)));
order.setOrderStatus(0);
order.setCreateTime(LocalDateTime.now());
order.setUpdateTime(LocalDateTime.now());
orderMapper.insert(order);
log.info("订单创建成功,orderNo:{}", orderNo);
return true;
} catch (Exception e) {
log.error("订单创建异常,orderNo:{}", orderNo, e);
status.setRollbackOnly();
return false;
}
});
if (Boolean.TRUE.equals(result)) {
publishOrderEvent(orderNo, userId);
return orderNo;
} else {
throw new RuntimeException("订单创建失败");
}
}
/**
* 发布订单完成事件
* @param orderNo 订单号
* @param userId 用户ID
*/
private void publishOrderEvent(String orderNo, Long userId) {
String[] eventTypes = {"SMS_NOTIFY", "LOGISTICS_SYNC", "POINT_GRANT", "USER_PROFILE_UPDATE"};
for (String eventType : eventTypes) {
OrderEvent event = OrderEvent.builder()
.eventId(UUID.randomUUID().toString().replace("-", ""))
.orderNo(orderNo)
.userId(userId)
.eventType(eventType)
.retryTimes(0)
.maxRetryTimes(3)
.createTime(LocalDateTime.now())
.build();
orderEventProducer.sendEvent(event);
}
}
}
3.6 订单控制器实现
package com.jam.demo.controller;
import com.jam.demo.service.OrderService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/order")
@Tag(name = "订单管理", description = "订单核心操作接口")
public class OrderController {
private final OrderService orderService;
public OrderController(OrderService orderService) {
this.orderService = orderService;
}
@PostMapping("/create")
@Operation(summary = "创建订单", description = "用户下单创建订单接口")
public ResponseEntity<String> createOrder(
@Parameter(description = "用户ID", required = true) @RequestParam Long userId,
@Parameter(description = "商品SKU ID", required = true) @RequestParam Long skuId,
@Parameter(description = "购买数量", required = true) @RequestParam Integer buyNum) {
String orderNo = orderService.createOrder(userId, skuId, buyNum);
return ResponseEntity.ok(orderNo);
}
}
3.7 配置文件(application.yml)
server:
port: 8080
shutdown: graceful
spring:
application:
name: high-concurrency-demo
datasource:
url: jdbc:mysql://127.0.0.1:3306/demo_order?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
lifecycle:
timeout-per-shutdown-phase: 30s
springdoc:
swagger-ui:
path: /swagger-ui.html
enabled: true
api-docs:
enabled: true
path: /v3/api-docs
mybatis-plus:
mapper-locations: classpath*:/mapper/**/*.xml
configuration:
map-underscore-to-camel-case: true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
四、高并发落地避坑指南
4.1 阻塞队列使用的核心坑点
- 无界队列导致OOM:LinkedBlockingQueue默认容量为Integer.MAX_VALUE,生产速度超过消费速度时,队列会无限膨胀,最终触发OOM。生产环境必须为所有阻塞队列强制指定容量。
- 队列容量设置不合理:容量过小会频繁触发拒绝策略,容量过大会导致任务积压、接口RT升高,甚至触发Full GC。队列容量需根据压测结果设置,公式参考:
队列容量 = 核心线程数 * 单任务平均执行时间 * 峰值QPS。
4.2 线程池使用的核心坑点
- 共用线程池导致雪崩:核心业务与非核心业务共用线程池,非核心业务的慢任务会占满线程池,导致核心业务无法执行。必须按业务场景与任务类型做线程池隔离。
- 线程数设置不合理:CPU密集型任务设置过多线程,会导致频繁的上下文切换,CPU利用率拉满但吞吐量骤降;IO密集型任务设置过少线程,会导致CPU资源浪费。CPU密集型任务核心线程数建议设置为
CPU核心数+1,IO密集型任务建议设置为CPU核心数 * 阻塞系数/(1-阻塞系数),阻塞系数通常为0.8-0.9。 - 使用Executors创建线程池:Executors创建的线程池均存在风险,FixedThreadPool和SingleThreadPool使用无界队列,CachedThreadPool使用无界最大线程数,都会导致OOM。生产环境必须使用ThreadPoolExecutor手动创建线程池。
4.3 生产者-消费者模式的核心坑点
- 虚假唤醒导致逻辑异常:使用if判断wait()的条件,而非while循环,线程被虚假唤醒后,条件不满足仍会继续执行,导致数据异常。JDK官方强制要求wait()必须放在while循环中,每次唤醒后重新判断条件。
- 消费线程异常退出:消费者业务逻辑未捕获异常,抛出RuntimeException后,消费线程会直接终止,无法继续消费,导致任务无限积压。消费者的业务逻辑必须用try-catch捕获所有异常,记录日志并做重试/降级处理,保证消费线程不会退出。
- 优雅停机导致任务丢失:服务直接使用kill -9终止,消费者线程立即停止,队列中未处理的任务会直接丢失。必须实现优雅停机,服务停止时先关闭生产者,再等待消费者处理完队列中的存量任务,最后关闭线程池,使用kill -15终止服务。
4.4 线程池隔离的核心坑点
- 隔离粒度过细/过粗:隔离粒度过细会导致线程数过多,上下文切换频繁;隔离粒度过粗会导致隔离失效。建议按业务域+任务类型做隔离,核心业务单独隔离,非核心业务可合并隔离。
- 拒绝策略使用不当:核心业务使用DiscardPolicy会导致任务丢失,非核心业务使用AbortPolicy会影响用户体验。核心写业务建议使用CallerRunsPolicy实现降级,非核心业务可使用DiscardPolicy,同时记录告警日志。
五、总结
生产者-消费者范式与线程池隔离,是高并发系统的两大核心支柱。生产者-消费者通过异步解耦,实现了系统的削峰填谷与吞吐量提升;线程池隔离通过舱壁模式,实现了系统的故障隔离与稳定性保障。二者结合,才能构建出既能扛住高并发流量冲击,又能保证业务连续性的生产级系统。 本文从底层JDK线程通信机制,到完整的电商订单业务落地,完整拆解了两大范式的实现逻辑与生产级规范。高并发编程的核心,从来不是炫技式的API调用,而是对底层原理的深刻理解,以及对生产环境风险的提前预判,只有把基础打牢,才能写出真正稳定、高性能的高并发代码。