高并发系统的核心骨架:生产者 - 消费者范式与线程池隔离的生产级落地全解

简介: 本文深度解析高并发系统两大核心:生产者-消费者范式(实现异步解耦、削峰填谷)与线程池隔离(基于舱壁模式保障故障隔离)。涵盖synchronized/Condition/BlockingQueue三种实现、JUC队列选型、优雅停机、监控告警及典型避坑指南,附完整电商订单落地代码。

引言

在高并发业务场景中,系统的吞吐量、稳定性与可维护性,核心取决于异步解耦与故障隔离能力的落地。绝大多数线上高并发故障,本质都源于两个核心问题:一是同步调用导致的链路阻塞与响应超时,二是共用资源池引发的级联雪崩。生产者-消费者范式是异步解耦的核心骨架,线程池隔离则是故障隔离的黄金法则,二者结合,才能构建出能扛住百万级QPS、同时保证业务连续性的高并发系统。本文从底层原理到生产级代码实现,完整拆解这两个核心范式的落地逻辑,帮你彻底打通高并发编程的任督二脉。


一、生产者-消费者范式:从底层原理到生产级实现

1.1 范式的核心本质与业务价值

生产者-消费者是一种基于异步解耦的并发设计范式,核心是通过一个线程安全的中间缓冲区(队列),完全分离数据生产逻辑与数据消费逻辑。生产者仅需将数据写入缓冲区,无需关心消费逻辑的实现、执行时机与扩容策略;消费者仅需从缓冲区拉取数据处理,无需关心数据的生产来源与触发逻辑。

其核心业务价值集中在三点:

  1. 解耦:生产者与消费者无直接代码依赖,仅依赖缓冲区契约,双方可独立迭代、扩容与故障隔离,单一方异常不会直接导致全链路阻塞。
  2. 异步化:同步转异步,大幅缩短核心接口响应时长,将非核心逻辑从主链路剥离,提升用户体验。
  3. 削峰填谷:流量峰值期,生产者将请求写入缓冲区,消费者以固定处理能力匀速消费,避免峰值流量直接打垮下游数据库与第三方服务,是秒杀、大促场景的核心兜底能力。

1.2 底层线程通信机制全解析

生产者与消费者运行在不同线程中,必须解决线程间的同步与通信问题,避免出现数据丢失、重复消费、死锁与虚假唤醒等问题。Java中主流的线程通信机制分为三层,从底层原生实现到生产级封装逐层递进。

1.2.1 基于synchronized + wait/notify的原生实现

这是Java线程通信的底层基础,基于对象的监视器锁(Monitor)实现,JDK17中对synchronized做了全链路优化,从偏向锁、轻量级锁到重量级锁的自适应升级,性能已与显式Lock无显著差距。

核心规范(JDK官方强制要求):

  • wait()、notify()、notifyAll()必须在synchronized同步块内执行,否则会抛出IllegalMonitorStateException
  • wait()必须放在while循环中判断条件,禁止使用if判断,避免虚假唤醒导致的逻辑异常
  • 优先使用notifyAll()而非notify(),避免信号丢失导致的线程永久阻塞

package com.jam.demo.base;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import java.util.Queue;
@Slf4j
public class WaitNotifyModel {
   private final Queue<Object> buffer;
   private final int capacity;
   public WaitNotifyModel(int capacity) {
       this.buffer = new LinkedList<>();
       this.capacity = capacity;
   }
   /**
    * 生产者生产数据
    * @param data 待生产的数据
    * @throws InterruptedException 线程中断异常
    */

   public synchronized void produce(Object data) throws InterruptedException {
       while (buffer.size() == capacity) {
           this.wait();
       }
       buffer.offer(data);
       log.info("生产者生产数据,当前缓冲区大小:{}", buffer.size());
       this.notifyAll();
   }
   /**
    * 消费者消费数据
    * @return 消费的数据
    * @throws InterruptedException 线程中断异常
    */

   public synchronized Object consume() throws InterruptedException {
       while (buffer.isEmpty()) {
           this.wait();
       }
       Object data = buffer.poll();
       log.info("消费者消费数据,当前缓冲区大小:{}", buffer.size());
       this.notifyAll();
       return data;
   }
}

1.2.2 基于Lock + Condition的精准唤醒实现

显式Lock与Condition组合,解决了synchronized只能有一个等待队列的缺陷,支持多条件队列实现精准唤醒,减少无效的线程上下文切换,底层基于JUC的AQS(抽象队列同步器)实现。

核心优势:可分别创建生产者等待队列(notFull)与消费者等待队列(notEmpty),缓冲区满时仅阻塞生产者,缓冲区空时仅阻塞消费者,唤醒时仅唤醒对应队列的线程,无需全量唤醒。

package com.jam.demo.base;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class LockConditionModel {
   private final Queue<Object> buffer;
   private final int capacity;
   private final Lock lock;
   private final Condition notFull;
   private final Condition notEmpty;
   public LockConditionModel(int capacity) {
       this.buffer = new LinkedList<>();
       this.capacity = capacity;
       this.lock = new ReentrantLock();
       this.notFull = lock.newCondition();
       this.notEmpty = lock.newCondition();
   }
   /**
    * 生产者生产数据
    * @param data 待生产的数据
    * @throws InterruptedException 线程中断异常
    */

   public void produce(Object data) throws InterruptedException {
       lock.lockInterruptibly();
       try {
           while (buffer.size() == capacity) {
               notFull.await();
           }
           buffer.offer(data);
           log.info("生产者生产数据,当前缓冲区大小:{}", buffer.size());
           notEmpty.signal();
       } finally {
           lock.unlock();
       }
   }
   /**
    * 消费者消费数据
    * @return 消费的数据
    * @throws InterruptedException 线程中断异常
    */

   public Object consume() throws InterruptedException {
       lock.lockInterruptibly();
       try {
           while (buffer.isEmpty()) {
               notEmpty.await();
           }
           Object data = buffer.poll();
           log.info("消费者消费数据,当前缓冲区大小:{}", buffer.size());
           notFull.signal();
           return data;
       } finally {
           lock.unlock();
       }
   }
}

1.2.3 基于JUC BlockingQueue的生产级封装

生产环境99%的场景,无需手动实现线程同步逻辑,直接使用JUC提供的BlockingQueue阻塞队列即可,其内部已完整实现了线程同步、等待唤醒与内存可见性保证,是生产者-消费者范式的标准实现载体。

核心特性:

  • 缓冲区满时,生产者线程自动阻塞,直到缓冲区出现空闲位置
  • 缓冲区空时,消费者线程自动阻塞,直到缓冲区写入新数据
  • 所有操作均为线程安全,无并发安全问题
阻塞队列选型指南(生产级)
队列实现类 底层结构 有界/无界 核心特性 生产适用场景
ArrayBlockingQueue 定长数组 有界 单锁实现,读写不分离,支持公平锁,内存占用固定 容量固定的常规业务场景,严格控制内存占用
LinkedBlockingQueue 链表 可选有界/无界 双锁实现,读写分离,吞吐量更高,默认无界(高危) 高吞吐量场景,必须强制指定容量
SynchronousQueue 无缓冲 有界(容量0) 不存储元素,生产必须等待消费,一对一传递 短平快的无堆积任务场景,CachedThreadPool默认队列
DelayQueue 优先级堆 无界 延迟消费,元素必须实现Delayed接口 超时订单关闭、定时任务重试、缓存过期清理
PriorityBlockingQueue 二叉堆 无界 按优先级排序,支持自定义比较器 VIP订单优先处理、分级任务调度场景
核心API选型规范

BlockingQueue提供了四类操作API,生产环境需严格按场景选型,禁止乱用:

操作类型 抛出异常 返回特殊值 无限阻塞 超时等待
数据插入 add(e) offer(e) put(e) offer(e, timeout, unit)
数据移除 remove() poll() take() poll(timeout, unit)
数据查看 element() peek() 不支持 不支持

生产环境强制规范:

  • 插入操作优先使用offer(e, timeout, unit),禁止无限阻塞的put(e),设置超时时间实现降级兜底,避免生产者线程永久阻塞
  • 移除操作优先使用poll(timeout, unit),禁止无限阻塞的take(),方便优雅停机与线程生命周期管理
  • 绝对禁止使用无界队列,除非能100%保证生产速度永远不超过消费速度,否则必然导致OOM

1.3 生产级生产者-消费者完整实现

以电商订单异步事件处理为场景,实现带优雅停机、异常重试、监控告警的生产级代码,基于JDK17编写,完整可运行。

1.3.1 项目基础依赖(pom.xml)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

   <modelVersion>4.0.0</modelVersion>
   <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>3.4.0</version>
       <relativePath/>
   </parent>
   <groupId>com.jam</groupId>
   <artifactId>high-concurrency-demo</artifactId>
   <version>1.0.0</version>
   <name>high-concurrency-demo</name>
   <properties>
       <java.version>17</java.version>
       <guava.version>33.2.0-jre</guava.version>
       <fastjson2.version>2.0.53</fastjson2.version>
       <mybatis-plus.version>3.5.7</mybatis-plus.version>
       <springdoc.version>2.5.0</springdoc.version>
   </properties>
   <dependencies>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-jdbc</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-validation</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springdoc</groupId>
           <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
           <version>${springdoc.version}</version>
       </dependency>
       <dependency>
           <groupId>com.baomidou</groupId>
           <artifactId>mybatis-plus-boot-starter</artifactId>
           <version>${mybatis-plus.version}</version>
       </dependency>
       <dependency>
           <groupId>com.mysql</groupId>
           <artifactId>mysql-connector-j</artifactId>
           <scope>runtime</scope>
       </dependency>
       <dependency>
           <groupId>org.projectlombok</groupId>
           <artifactId>lombok</artifactId>
           <version>1.18.34</version>
           <scope>provided</scope>
       </dependency>
       <dependency>
           <groupId>com.google.guava</groupId>
           <artifactId>guava</artifactId>
           <version>${guava.version}</version>
       </dependency>
       <dependency>
           <groupId>com.alibaba.fastjson2</groupId>
           <artifactId>fastjson2</artifactId>
           <version>${fastjson2.version}</version>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-test</artifactId>
           <scope>test</scope>
       </dependency>
   </dependencies>
   <build>
       <plugins>
           <plugin>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-maven-plugin</artifactId>
               <configuration>
                   <excludes>
                       <exclude>
                           <groupId>org.projectlombok</groupId>
                           <artifactId>lombok</artifactId>
                       </exclude>
                   </excludes>
               </configuration>
           </plugin>
       </plugins>
   </build>
</project>

1.3.2 事件载体定义

package com.jam.demo.model;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serial;
import java.io.Serializable;
import java.time.LocalDateTime;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "订单事件实体")
public class OrderEvent implements Serializable {
   @Serial
   private static final long serialVersionUID = 1L;
   @Schema(description = "事件ID")
   private String eventId;
   @Schema(description = "订单号")
   private String orderNo;
   @Schema(description = "用户ID")
   private Long userId;
   @Schema(description = "事件类型")
   private String eventType;
   @Schema(description = "事件内容")
   private String content;
   @Schema(description = "重试次数")
   private Integer retryTimes;
   @Schema(description = "最大重试次数")
   private Integer maxRetryTimes;
   @Schema(description = "创建时间")
   private LocalDateTime createTime;
}

1.3.3 事件生产者实现

package com.jam.demo.producer;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.model.OrderEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class OrderEventProducer {
   private static final int QUEUE_CAPACITY = 10000;
   private static final long OFFER_TIMEOUT = 3L;
   private final LinkedBlockingQueue<OrderEvent> eventQueue;
   public OrderEventProducer() {
       this.eventQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
   }
   /**
    * 投递订单事件到缓冲区
    * @param event 订单事件
    * @return 投递结果
    */

   public boolean sendEvent(OrderEvent event) {
       if (org.springframework.util.ObjectUtils.isEmpty(event)) {
           log.warn("订单事件为空,投递失败");
           return false;
       }
       try {
           boolean result = eventQueue.offer(event, OFFER_TIMEOUT, TimeUnit.SECONDS);
           if (result) {
               log.info("订单事件投递成功,eventId:{},队列剩余容量:{}", event.getEventId(), eventQueue.remainingCapacity());
           } else {
               log.error("订单事件投递超时,事件内容:{}", JSON.toJSONString(event));
           }
           return result;
       } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           log.error("订单事件投递被中断,eventId:{}", event.getEventId(), e);
           return false;
       }
   }
   /**
    * 从缓冲区拉取事件(供消费者调用)
    * @param timeout 超时时间
    * @param unit 时间单位
    * @return 订单事件
    * @throws InterruptedException 线程中断异常
    */

   public OrderEvent pollEvent(long timeout, TimeUnit unit) throws InterruptedException {
       return eventQueue.poll(timeout, unit);
   }
   /**
    * 获取当前队列积压数量
    * @return 积压事件数
    */

   public int getQueueSize() {
       return eventQueue.size();
   }
   /**
    * 获取队列剩余容量
    * @return 剩余容量
    */

   public int getRemainingCapacity() {
       return eventQueue.remainingCapacity();
   }
}

1.3.4 事件消费者实现

package com.jam.demo.consumer;
import com.jam.demo.model.OrderEvent;
import com.jam.demo.producer.OrderEventProducer;
import com.jam.demo.manager.ThreadPoolManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Component
public class OrderEventConsumer implements ApplicationRunner, DisposableBean {
   private static final int CONSUMER_THREAD_NUM = 4;
   private static final long POLL_TIMEOUT = 5L;
   private final OrderEventProducer orderEventProducer;
   private final ThreadPoolManager threadPoolManager;
   private final AtomicBoolean isRunning = new AtomicBoolean(true);
   public OrderEventConsumer(OrderEventProducer orderEventProducer, ThreadPoolManager threadPoolManager) {
       this.orderEventProducer = orderEventProducer;
       this.threadPoolManager = threadPoolManager;
   }
   @Override
   public void run(ApplicationArguments args) {
       for (int i = 0; i < CONSUMER_THREAD_NUM; i++) {
           Thread consumerThread = new Thread(this::doConsume, "order-event-consumer-" + i);
           consumerThread.setDaemon(false);
           consumerThread.start();
       }
       log.info("订单事件消费者启动完成,消费线程数:{}", CONSUMER_THREAD_NUM);
   }
   /**
    * 核心消费逻辑
    */

   private void doConsume() {
       while (isRunning.get() || orderEventProducer.getQueueSize() > 0) {
           try {
               OrderEvent event = orderEventProducer.pollEvent(POLL_TIMEOUT, TimeUnit.SECONDS);
               if (org.springframework.util.ObjectUtils.isEmpty(event)) {
                   continue;
               }
               dispatchEvent(event);
           } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
               log.error("消费线程被中断,线程名:{}", Thread.currentThread().getName(), e);
               break;
           } catch (Exception e) {
               log.error("消费事件发生未知异常", e);
           }
       }
       log.info("消费线程退出,线程名:{}", Thread.currentThread().getName());
   }
   /**
    * 事件分发到对应业务线程池
    * @param event 订单事件
    */

   private void dispatchEvent(OrderEvent event) {
       switch (event.getEventType()) {
           case "SMS_NOTIFY" -> threadPoolManager.getSmsNotifyExecutor().execute(() -> handleSmsNotify(event));
           case "LOGISTICS_SYNC" -> threadPoolManager.getLogisticsSyncExecutor().execute(() -> handleLogisticsSync(event));
           case "POINT_GRANT" -> threadPoolManager.getPointGrantExecutor().execute(() -> handlePointGrant(event));
           case "USER_PROFILE_UPDATE" -> threadPoolManager.getUserProfileExecutor().execute(() -> handleUserProfileUpdate(event));
           default -> log.warn("未知事件类型,event:{}", event);
       }
   }
   private void handleSmsNotify(OrderEvent event) {
       log.info("处理短信通知事件,orderNo:{}", event.getOrderNo());
   }
   private void handleLogisticsSync(OrderEvent event) {
       log.info("处理物流同步事件,orderNo:{}", event.getOrderNo());
   }
   private void handlePointGrant(OrderEvent event) {
       log.info("处理积分赠送事件,orderNo:{}", event.getOrderNo());
   }
   private void handleUserProfileUpdate(OrderEvent event) {
       log.info("处理用户画像更新事件,orderNo:{}", event.getOrderNo());
   }
   @Override
   public void destroy() {
       isRunning.set(false);
       log.info("消费者开始优雅停机,剩余待处理事件数:{}", orderEventProducer.getQueueSize());
   }
}


二、线程池隔离:高并发系统的舱壁模式落地

2.1 为什么必须做线程池隔离?

绝大多数高并发雪崩故障,都源于共用资源池的级联故障。在传统单线程池架构中,所有业务任务共用同一个线程池,一旦某一个慢任务(如第三方接口超时、慢SQL)占满了线程池,会导致所有业务任务都无法执行,最终整个服务不可用。

举个典型的电商场景:订单服务包含四个核心操作,下单扣库存、发送短信、同步物流、更新用户画像。如果物流第三方接口超时,导致线程池内的所有线程都阻塞在该接口调用上,那么核心的下单扣库存任务也无法获取线程执行,整个订单服务瘫痪,这就是典型的线程池雪崩。

线程池隔离的核心思想来自舱壁模式(Bulkhead Pattern),源于船舶设计:船舶的船舱被隔离为多个独立舱室,单个舱室进水不会导致整个船舶沉没。对应到系统中,就是将不同类型的业务任务,拆分到独立的线程池中执行,单个线程池因故障占满,不会影响其他线程池的任务执行,实现故障隔离,避免级联雪崩。

2.2 线程池核心原理与执行流程

线程池隔离的前提,是彻底理解线程池的核心参数与执行逻辑,否则隔离只会流于形式,无法真正实现故障隔离。

2.2.1 线程池核心参数

public ThreadPoolExecutor(int corePoolSize,
                         int maximumPoolSize,
                         long keepAliveTime,
                         TimeUnit unit,
                         BlockingQueue<Runnable> workQueue,
                         ThreadFactory threadFactory,
                         RejectedExecutionHandler handler)

  1. corePoolSize:核心线程数,线程池常驻的线程数量,默认不会被回收
  2. maximumPoolSize:最大线程数,线程池能容纳的最大线程数量
  3. keepAliveTime:非核心线程的空闲存活时间,超过该时间未执行任务的非核心线程会被回收
  4. unit:存活时间的单位
  5. workQueue:工作队列,存放等待执行的任务
  6. threadFactory:线程工厂,用于创建线程,生产环境必须自定义,设置线程名称与异常处理器,方便问题排查
  7. handler:拒绝策略,当线程池与工作队列都满时,处理新任务的策略

2.2.2 线程池执行流程

2.2.3 拒绝策略选型指南

JDK提供了4种默认拒绝策略,生产环境需按业务场景严格选型:

  1. AbortPolicy:默认策略,直接抛出RejectedExecutionException异常,适合核心业务,及时感知故障
  2. CallerRunsPolicy:由调用线程执行任务,实现自动降级,避免任务丢失,适合核心写业务
  3. DiscardPolicy:直接丢弃任务,不抛出异常,适合非核心、可丢失的业务
  4. DiscardOldestPolicy:丢弃队列中最旧的任务,重试提交当前任务,适合优先级均匀的场景

2.3 生产级线程池隔离完整实现

2.3.1 自定义线程工厂

package com.jam.demo.factory;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class CustomThreadFactory implements ThreadFactory {
   private final String threadNamePrefix;
   private final AtomicInteger threadNumber = new AtomicInteger(1);
   private final boolean isDaemon;
   public CustomThreadFactory(String threadNamePrefix, boolean isDaemon) {
       this.threadNamePrefix = threadNamePrefix;
       this.isDaemon = isDaemon;
   }
   @Override
   public Thread newThread(Runnable r) {
       String threadName = threadNamePrefix + "-" + threadNumber.getAndIncrement();
       Thread thread = new Thread(r, threadName);
       thread.setDaemon(isDaemon);
       thread.setUncaughtExceptionHandler((t, e) -> log.error("线程{}发生未捕获异常", t.getName(), e));
       return thread;
   }
}

2.3.2 自定义拒绝策略

package com.jam.demo.handler;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
   private final String threadPoolName;
   public CustomRejectedExecutionHandler(String threadPoolName) {
       this.threadPoolName = threadPoolName;
   }
   @Override
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
       log.error("线程池[{}]触发拒绝策略,核心线程数:{},最大线程数:{},活跃线程数:{},队列积压数:{}",
               threadPoolName,
               executor.getCorePoolSize(),
               executor.getMaximumPoolSize(),
               executor.getActiveCount(),
               executor.getQueue().size());
       if (!executor.isShutdown()) {
           r.run();
       }
   }
}

2.3.3 线程池管理器

package com.jam.demo.manager;
import com.jam.demo.factory.CustomThreadFactory;
import com.jam.demo.handler.CustomRejectedExecutionHandler;
import lombok.Getter;
import org.springframework.stereotype.Component;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Getter
@Component
public class ThreadPoolManager {
   private static final int CPU_CORE_NUM = Runtime.getRuntime().availableProcessors();
   private final ThreadPoolExecutor orderCoreExecutor;
   private final ThreadPoolExecutor smsNotifyExecutor;
   private final ThreadPoolExecutor logisticsSyncExecutor;
   private final ThreadPoolExecutor pointGrantExecutor;
   private final ThreadPoolExecutor userProfileExecutor;
   public ThreadPoolManager() {
       this.orderCoreExecutor = new ThreadPoolExecutor(
               CPU_CORE_NUM,
               CPU_CORE_NUM * 2,
               60L,
               TimeUnit.SECONDS,
               new ArrayBlockingQueue<>(1000),
               new CustomThreadFactory("order-core-pool", false),
               new CustomRejectedExecutionHandler("order-core-pool")
       );
       this.smsNotifyExecutor = new ThreadPoolExecutor(
               CPU_CORE_NUM * 10,
               CPU_CORE_NUM * 20,
               60L,
               TimeUnit.SECONDS,
               new LinkedBlockingQueue<>(5000),
               new CustomThreadFactory("sms-notify-pool", false),
               new CustomRejectedExecutionHandler("sms-notify-pool")
       );
       this.logisticsSyncExecutor = new ThreadPoolExecutor(
               CPU_CORE_NUM * 5,
               CPU_CORE_NUM * 10,
               60L,
               TimeUnit.SECONDS,
               new LinkedBlockingQueue<>(2000),
               new CustomThreadFactory("logistics-sync-pool", false),
               new CustomRejectedExecutionHandler("logistics-sync-pool")
       );
       this.pointGrantExecutor = new ThreadPoolExecutor(
               CPU_CORE_NUM * 8,
               CPU_CORE_NUM * 15,
               60L,
               TimeUnit.SECONDS,
               new LinkedBlockingQueue<>(3000),
               new CustomThreadFactory("point-grant-pool", false),
               new CustomRejectedExecutionHandler("point-grant-pool")
       );
       this.userProfileExecutor = new ThreadPoolExecutor(
               CPU_CORE_NUM * 2,
               CPU_CORE_NUM * 4,
               60L,
               TimeUnit.SECONDS,
               new LinkedBlockingQueue<>(1000),
               new CustomThreadFactory("user-profile-pool", false),
               new CustomRejectedExecutionHandler("user-profile-pool")
       );
   }
}

2.3.4 线程池监控实现

package com.jam.demo.monitor;
import com.jam.demo.manager.ThreadPoolManager;
import com.jam.demo.producer.OrderEventProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
@Component
public class ThreadPoolMonitor {
   private final ThreadPoolManager threadPoolManager;
   private final OrderEventProducer orderEventProducer;
   public ThreadPoolMonitor(ThreadPoolManager threadPoolManager, OrderEventProducer orderEventProducer) {
       this.threadPoolManager = threadPoolManager;
       this.orderEventProducer = orderEventProducer;
   }
   @Scheduled(fixedRate = 60000)
   public void printThreadPoolMetrics() {
       log.info("====================线程池监控指标====================");
       printExecutorMetrics("order-core-pool", threadPoolManager.getOrderCoreExecutor());
       printExecutorMetrics("sms-notify-pool", threadPoolManager.getSmsNotifyExecutor());
       printExecutorMetrics("logistics-sync-pool", threadPoolManager.getLogisticsSyncExecutor());
       printExecutorMetrics("point-grant-pool", threadPoolManager.getPointGrantExecutor());
       printExecutorMetrics("user-profile-pool", threadPoolManager.getUserProfileExecutor());
       log.info("事件队列积压数:{},剩余容量:{}", orderEventProducer.getQueueSize(), orderEventProducer.getRemainingCapacity());
       log.info("======================================================");
   }
   private void printExecutorMetrics(String poolName, ThreadPoolExecutor executor) {
       log.info("[{}] 核心线程数:{},最大线程数:{},活跃线程数:{},完成任务数:{},队列积压数:{},拒绝任务数:{}",
               poolName,
               executor.getCorePoolSize(),
               executor.getMaximumPoolSize(),
               executor.getActiveCount(),
               executor.getCompletedTaskCount(),
               executor.getQueue().size(),
               executor.getRejectedExecutionHandler().getClass().getSimpleName());
   }
}


三、完整业务落地:订单系统的高并发实现

3.1 业务架构设计

3.2 数据库表设计(MySQL 8.0)

CREATE TABLE `t_order` (
 `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
 `order_no` varchar(64) NOT NULL COMMENT '订单号',
 `user_id` bigint NOT NULL COMMENT '用户ID',
 `sku_id` bigint NOT NULL COMMENT '商品SKU ID',
 `buy_num` int NOT NULL COMMENT '购买数量',
 `order_amount` decimal(12,2) NOT NULL COMMENT '订单金额',
 `order_status` tinyint NOT NULL DEFAULT '0' COMMENT '订单状态 0-待支付 1-已支付 2-已发货 3-已完成 4-已取消',
 `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
 `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
 PRIMARY KEY (`id`),
 UNIQUE KEY `uk_order_no` (`order_no`),
 KEY `idx_user_id` (`user_id`),
 KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='订单表';

CREATE TABLE `t_sku_stock` (
 `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
 `sku_id` bigint NOT NULL COMMENT '商品SKU ID',
 `stock_num` int NOT NULL DEFAULT '0' COMMENT '库存数量',
 `lock_stock_num` int NOT NULL DEFAULT '0' COMMENT '锁定库存数量',
 `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
 `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
 PRIMARY KEY (`id`),
 UNIQUE KEY `uk_sku_id` (`sku_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='商品库存表';

3.3 实体类定义

package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDateTime;
@Data
@TableName("t_order")
@Schema(description = "订单实体")
public class Order implements Serializable {
   @Serial
   private static final long serialVersionUID = 1L;
   @TableId(type = IdType.AUTO)
   @Schema(description = "主键ID")
   private Long id;
   @Schema(description = "订单号")
   private String orderNo;
   @Schema(description = "用户ID")
   private Long userId;
   @Schema(description = "商品SKU ID")
   private Long skuId;
   @Schema(description = "购买数量")
   private Integer buyNum;
   @Schema(description = "订单金额")
   private BigDecimal orderAmount;
   @Schema(description = "订单状态")
   private Integer orderStatus;
   @Schema(description = "创建时间")
   private LocalDateTime createTime;
   @Schema(description = "更新时间")
   private LocalDateTime updateTime;
}

package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.time.LocalDateTime;
@Data
@TableName("t_sku_stock")
@Schema(description = "库存实体")
public class SkuStock implements Serializable {
   @Serial
   private static final long serialVersionUID = 1L;
   @TableId(type = IdType.AUTO)
   @Schema(description = "主键ID")
   private Long id;
   @Schema(description = "商品SKU ID")
   private Long skuId;
   @Schema(description = "库存数量")
   private Integer stockNum;
   @Schema(description = "锁定库存数量")
   private Integer lockStockNum;
   @Schema(description = "创建时间")
   private LocalDateTime createTime;
   @Schema(description = "更新时间")
   private LocalDateTime updateTime;
}

3.4 Mapper接口定义

package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.Order;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;
public interface OrderMapper extends BaseMapper<Order> {
}

package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.SkuStock;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;
public interface SkuStockMapper extends BaseMapper<SkuStock> {
   @Update("UPDATE t_sku_stock SET stock_num = stock_num - #{buyNum}, lock_stock_num = lock_stock_num + #{buyNum} WHERE sku_id = #{skuId} AND stock_num >= #{buyNum}")
   int deductStock(@Param("skuId") Long skuId, @Param("buyNum") Integer buyNum);
}

3.5 核心订单服务实现

package com.jam.demo.service;
import com.jam.demo.entity.Order;
import com.jam.demo.manager.ThreadPoolManager;
import com.jam.demo.mapper.OrderMapper;
import com.jam.demo.mapper.SkuStockMapper;
import com.jam.demo.model.OrderEvent;
import com.jam.demo.producer.OrderEventProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.LocalDateTime;
import java.util.UUID;
@Slf4j
@Service
public class OrderService {
   private final OrderMapper orderMapper;
   private final SkuStockMapper skuStockMapper;
   private final TransactionTemplate transactionTemplate;
   private final OrderEventProducer orderEventProducer;
   private final ThreadPoolManager threadPoolManager;
   public OrderService(OrderMapper orderMapper, SkuStockMapper skuStockMapper, TransactionTemplate transactionTemplate, OrderEventProducer orderEventProducer, ThreadPoolManager threadPoolManager) {
       this.orderMapper = orderMapper;
       this.skuStockMapper = skuStockMapper;
       this.transactionTemplate = transactionTemplate;
       this.orderEventProducer = orderEventProducer;
       this.threadPoolManager = threadPoolManager;
   }
   /**
    * 创建订单
    * @param userId 用户ID
    * @param skuId 商品SKU ID
    * @param buyNum 购买数量
    * @return 订单号
    */

   public String createOrder(Long userId, Long skuId, Integer buyNum) {
       if (org.springframework.util.ObjectUtils.isEmpty(userId) || org.springframework.util.ObjectUtils.isEmpty(skuId) || org.springframework.util.ObjectUtils.isEmpty(buyNum) || buyNum <= 0) {
           throw new IllegalArgumentException("订单参数异常");
       }
       String orderNo = UUID.randomUUID().toString().replace("-", "");
       Boolean result = transactionTemplate.execute(status -> {
           try {
               int deductResult = skuStockMapper.deductStock(skuId, buyNum);
               if (deductResult <= 0) {
                   log.warn("库存扣减失败,skuId:{}, buyNum:{}", skuId, buyNum);
                   status.setRollbackOnly();
                   return false;
               }
               Order order = new Order();
               order.setOrderNo(orderNo);
               order.setUserId(userId);
               order.setSkuId(skuId);
               order.setBuyNum(buyNum);
               order.setOrderAmount(java.math.BigDecimal.valueOf(100).multiply(java.math.BigDecimal.valueOf(buyNum)));
               order.setOrderStatus(0);
               order.setCreateTime(LocalDateTime.now());
               order.setUpdateTime(LocalDateTime.now());
               orderMapper.insert(order);
               log.info("订单创建成功,orderNo:{}", orderNo);
               return true;
           } catch (Exception e) {
               log.error("订单创建异常,orderNo:{}", orderNo, e);
               status.setRollbackOnly();
               return false;
           }
       });
       if (Boolean.TRUE.equals(result)) {
           publishOrderEvent(orderNo, userId);
           return orderNo;
       } else {
           throw new RuntimeException("订单创建失败");
       }
   }
   /**
    * 发布订单完成事件
    * @param orderNo 订单号
    * @param userId 用户ID
    */

   private void publishOrderEvent(String orderNo, Long userId) {
       String[] eventTypes = {"SMS_NOTIFY", "LOGISTICS_SYNC", "POINT_GRANT", "USER_PROFILE_UPDATE"};
       for (String eventType : eventTypes) {
           OrderEvent event = OrderEvent.builder()
                   .eventId(UUID.randomUUID().toString().replace("-", ""))
                   .orderNo(orderNo)
                   .userId(userId)
                   .eventType(eventType)
                   .retryTimes(0)
                   .maxRetryTimes(3)
                   .createTime(LocalDateTime.now())
                   .build();
           orderEventProducer.sendEvent(event);
       }
   }
}

3.6 订单控制器实现

package com.jam.demo.controller;
import com.jam.demo.service.OrderService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/order")
@Tag(name = "订单管理", description = "订单核心操作接口")
public class OrderController {
   private final OrderService orderService;
   public OrderController(OrderService orderService) {
       this.orderService = orderService;
   }
   @PostMapping("/create")
   @Operation(summary = "创建订单", description = "用户下单创建订单接口")
   public ResponseEntity<String> createOrder(
           @Parameter(description = "用户ID", required = true)
@RequestParam Long userId,
           @Parameter(description = "商品SKU ID", required = true) @RequestParam Long skuId,
           @Parameter(description = "购买数量", required = true) @RequestParam Integer buyNum) {
       String orderNo = orderService.createOrder(userId, skuId, buyNum);
       return ResponseEntity.ok(orderNo);
   }
}

3.7 配置文件(application.yml)

server:
 port: 8080
 shutdown: graceful
spring:
 application:
   name: high-concurrency-demo
 datasource:
   url: jdbc:mysql://127.0.0.1:3306/demo_order?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
   username: root
   password: root
   driver-class-name: com.mysql.cj.jdbc.Driver
 lifecycle:
   timeout-per-shutdown-phase: 30s
springdoc:
 swagger-ui:
   path: /swagger-ui.html
   enabled: true
 api-docs:
   enabled: true
   path: /v3/api-docs
mybatis-plus:
 mapper-locations: classpath*:/mapper/**/*.xml
 configuration:
   map-underscore-to-camel-case: true
   log-impl: org.apache.ibatis.logging.stdout.StdOutImpl


四、高并发落地避坑指南

4.1 阻塞队列使用的核心坑点

  • 无界队列导致OOM:LinkedBlockingQueue默认容量为Integer.MAX_VALUE,生产速度超过消费速度时,队列会无限膨胀,最终触发OOM。生产环境必须为所有阻塞队列强制指定容量。
  • 队列容量设置不合理:容量过小会频繁触发拒绝策略,容量过大会导致任务积压、接口RT升高,甚至触发Full GC。队列容量需根据压测结果设置,公式参考:队列容量 = 核心线程数 * 单任务平均执行时间 * 峰值QPS

4.2 线程池使用的核心坑点

  • 共用线程池导致雪崩:核心业务与非核心业务共用线程池,非核心业务的慢任务会占满线程池,导致核心业务无法执行。必须按业务场景与任务类型做线程池隔离。
  • 线程数设置不合理:CPU密集型任务设置过多线程,会导致频繁的上下文切换,CPU利用率拉满但吞吐量骤降;IO密集型任务设置过少线程,会导致CPU资源浪费。CPU密集型任务核心线程数建议设置为CPU核心数+1,IO密集型任务建议设置为CPU核心数 * 阻塞系数/(1-阻塞系数),阻塞系数通常为0.8-0.9。
  • 使用Executors创建线程池:Executors创建的线程池均存在风险,FixedThreadPool和SingleThreadPool使用无界队列,CachedThreadPool使用无界最大线程数,都会导致OOM。生产环境必须使用ThreadPoolExecutor手动创建线程池。

4.3 生产者-消费者模式的核心坑点

  • 虚假唤醒导致逻辑异常:使用if判断wait()的条件,而非while循环,线程被虚假唤醒后,条件不满足仍会继续执行,导致数据异常。JDK官方强制要求wait()必须放在while循环中,每次唤醒后重新判断条件。
  • 消费线程异常退出:消费者业务逻辑未捕获异常,抛出RuntimeException后,消费线程会直接终止,无法继续消费,导致任务无限积压。消费者的业务逻辑必须用try-catch捕获所有异常,记录日志并做重试/降级处理,保证消费线程不会退出。
  • 优雅停机导致任务丢失:服务直接使用kill -9终止,消费者线程立即停止,队列中未处理的任务会直接丢失。必须实现优雅停机,服务停止时先关闭生产者,再等待消费者处理完队列中的存量任务,最后关闭线程池,使用kill -15终止服务。

4.4 线程池隔离的核心坑点

  • 隔离粒度过细/过粗:隔离粒度过细会导致线程数过多,上下文切换频繁;隔离粒度过粗会导致隔离失效。建议按业务域+任务类型做隔离,核心业务单独隔离,非核心业务可合并隔离。
  • 拒绝策略使用不当:核心业务使用DiscardPolicy会导致任务丢失,非核心业务使用AbortPolicy会影响用户体验。核心写业务建议使用CallerRunsPolicy实现降级,非核心业务可使用DiscardPolicy,同时记录告警日志。

五、总结

生产者-消费者范式与线程池隔离,是高并发系统的两大核心支柱。生产者-消费者通过异步解耦,实现了系统的削峰填谷与吞吐量提升;线程池隔离通过舱壁模式,实现了系统的故障隔离与稳定性保障。二者结合,才能构建出既能扛住高并发流量冲击,又能保证业务连续性的生产级系统。 本文从底层JDK线程通信机制,到完整的电商订单业务落地,完整拆解了两大范式的实现逻辑与生产级规范。高并发编程的核心,从来不是炫技式的API调用,而是对底层原理的深刻理解,以及对生产环境风险的提前预判,只有把基础打牢,才能写出真正稳定、高性能的高并发代码。

目录
相关文章
|
9天前
|
人工智能 安全 Linux
【OpenClaw保姆级图文教程】阿里云/本地部署集成模型Ollama/Qwen3.5/百炼 API 步骤流程及避坑指南
2026年,AI代理工具的部署逻辑已从“单一云端依赖”转向“云端+本地双轨模式”。OpenClaw(曾用名Clawdbot)作为开源AI代理框架,既支持对接阿里云百炼等云端免费API,也能通过Ollama部署本地大模型,完美解决两类核心需求:一是担心云端API泄露核心数据的隐私安全诉求;二是频繁调用导致token消耗过高的成本控制需求。
5351 11
|
17天前
|
人工智能 JavaScript Ubuntu
5分钟上手龙虾AI!OpenClaw部署(阿里云+本地)+ 免费多模型配置保姆级教程(MiniMax、Claude、阿里云百炼)
OpenClaw(昵称“龙虾AI”)作为2026年热门的开源个人AI助手,由PSPDFKit创始人Peter Steinberger开发,核心优势在于“真正执行任务”——不仅能聊天互动,还能自动处理邮件、管理日程、订机票、写代码等,且所有数据本地处理,隐私完全可控。它支持接入MiniMax、Claude、GPT等多类大模型,兼容微信、Telegram、飞书等主流聊天工具,搭配100+可扩展技能,成为兼顾实用性与隐私性的AI工具首选。
21486 116
|
13天前
|
人工智能 安全 前端开发
Team 版 OpenClaw:HiClaw 开源,5 分钟完成本地安装
HiClaw 基于 OpenClaw、Higress AI Gateway、Element IM 客户端+Tuwunel IM 服务器(均基于 Matrix 实时通信协议)、MinIO 共享文件系统打造。
8200 7

热门文章

最新文章