在分布式系统中,消息队列就像“交通枢纽”,承接生产者的消息、调度消费者的消费节奏,是解耦、削峰、异步通信的核心组件。但一旦出现“百万消息积压几小时”的问题,就相当于交通枢纽彻底瘫痪——下游业务无法获取消息、数据一致性被破坏、甚至引发连锁故障,直接影响用户体验和业务连续性。
作为常年和分布式架构打交道的开发者,我曾在生产环境中多次处理过消息积压问题,小到几万条消息的短暂阻塞,大到百万级消息积压4小时的紧急故障,总结出了一套“紧急止血→根源排查→彻底解决→复盘优化”的全流程方案。
一、先搞懂:消息积压的核心本质
很多开发者遇到积压就慌,盲目扩容消费者、重启服务,结果越搞越乱——其实消息积压的本质很简单,用一句话就能说透:消息生产速度 ≥ 消息消费速度,且积压量超过了消息队列的缓冲能力,导致消息在队列中持续堆积。
类比一下:消息队列就像小区的快递柜,生产者是快递员,消费者是取快递的业主。正常情况下,快递员送快递的速度(生产速度),和业主取快递的速度(消费速度)基本匹配,快递柜不会满;但如果快递员突然批量送百万个包裹(生产者突增),或者业主都在家不出来取件(消费者消费慢/挂掉),快递柜很快就会被堆满,后续的快递只能排队等待,这就是“消息积压”。
1.1 积压的3个核心前提
- 生产速度 > 消费速度:这是最常见的原因,比如大促期间,订单生产者每秒产生1000条消息,而消费者每秒只能处理100条,差值会持续累积,最终导致积压;
- 消费端故障:消费者服务挂掉、线程池阻塞、数据库宕机,导致消费完全停止,即使生产速度正常,消息也会持续堆积;
- 队列配置不合理:队列分区过少、消息拉取策略不当、死信队列未处理,导致消费端无法充分利用资源,即使消费者正常,也无法高效消费消息。
1.2 积压的底层影响
很多人觉得“积压几小时没事”,其实积压的危害会持续放大,甚至引发级联故障:
- 消息超时:大部分消息队列(RocketMQ、Kafka)的消息都有超时时间,积压过久会导致消息过期,被丢弃或进入死信队列,引发业务数据丢失;
- 队列撑满:消息队列的存储容量有限(比如磁盘满),积压过久会导致队列无法接收新消息,生产者报错,上游业务瘫痪;
- 消费雪崩:积压的消息过多时,若消费者突然恢复,会一次性拉取大量消息,导致消费者线程池满、CPU飙升、服务宕机,积压问题进一步恶化;
- 数据不一致:比如订单消息积压,下游库存服务无法及时扣减库存,可能导致超卖、漏卖,后续需要大量人力复盘对账。
1.3 积压排查流程图(必用,快速定位原因)
遇到积压问题,先不要急着解决,先排查原因——用下面这个流程图,3分钟就能定位到积压的核心原因,避免盲目操作:
二、紧急止血:30分钟内快速缓解积压(优先保业务)
当百万消息积压几小时,核心诉求是“快速缓解,避免业务进一步恶化”——这一步的核心思路是:暂时切断非核心压力,最大化提升消费能力,快速消化积压消息,相当于“先找临时快递员,把堆积的快递先拉走一部分,缓解快递柜压力”。
紧急止血的操作优先级:暂停非核心生产者 → 临时扩容消费者 → 消息分流 → 跳过无效消息,四步走,30分钟内就能让积压量快速下降,具体操作如下(附实例)。
2.1 第一步:暂停非核心生产者(减少压力源)
积压的核心是“生产>消费”,所以第一步要做的是“减少生产”——暂停非核心业务的生产者,只保留核心业务的消息生产,避免积压量持续增加。
2.1.1 操作场景
比如电商系统中,百万订单消息积压,此时可以暂停“订单评价、物流通知”等非核心业务的生产者,只保留“订单创建、支付回调”等核心业务的生产者,让消费端集中精力处理核心消息。
2.1.2 实例
基于JDK17、SpringBoot3.2.3、Swagger3,实现生产者的动态启停,通过接口控制,无需重启服务,符合生产环境使用规范:
package com.jam.demo.producer.controller;
import com.jam.demo.producer.service.OrderProducerService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 生产者控制接口(用于动态启停非核心生产者)
* @author ken
*/
@RestController
@RequestMapping("/producer/control")
@Slf4j
@RequiredArgsConstructor
@Tag(name = "ProducerControlController", description = "生产者动态控制接口")
public class ProducerControlController {
private final OrderProducerService orderProducerService;
/**
* 启停非核心生产者(如订单评价、物流通知)
* @param enable true-启动,false-暂停
* @return 操作结果
*/
@PostMapping("/nonCore/enable")
@Operation(summary = "启停非核心生产者", description = "控制非核心业务的消息生产,缓解积压")
public String enableNonCoreProducer(@RequestParam Boolean enable) {
if (ObjectUtils.isEmpty(enable)) {
log.error("启停参数不能为空");
return "参数错误:enable不能为空";
}
orderProducerService.setNonCoreEnable(enable);
String result = enable ? "非核心生产者已启动" : "非核心生产者已暂停";
log.info(result);
return result;
}
/**
* 查看非核心生产者状态
* @return 状态描述
*/
@PostMapping("/nonCore/status")
@Operation(summary = "查看非核心生产者状态", description = "获取当前非核心生产者的启停状态")
public String getNonCoreProducerStatus() {
boolean enable = orderProducerService.getNonCoreEnable();
return enable ? "非核心生产者正在运行" : "非核心生产者已暂停";
}
}
package com.jam.demo.producer.service;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.common.entity.NonCoreMessage;
import com.jam.demo.producer.config.RocketMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import javax.annotation.Resource;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 订单生产者服务(核心+非核心)
* @author ken
*/
@Service
@Slf4j
public class OrderProducerService {
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 非核心生产者开关(原子类保证线程安全)
*/
private final AtomicBoolean nonCoreEnable = new AtomicBoolean(true);
/**
* 发送核心消息(订单创建、支付回调)
* @param message 核心消息实体
*/
public void sendCoreMessage(Object message) {
if (ObjectUtils.isEmpty(message)) {
log.error("核心消息不能为空");
return;
}
try {
SendResult sendResult = rocketMQTemplate.syncSend(
RocketMQConfig.CORE_ORDER_TOPIC,
JSON.toJSONString(message),
3000
);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("核心消息发送成功,消息ID:{}", sendResult.getMsgId());
} else {
log.error("核心消息发送失败,状态:{},消息内容:{}", sendResult.getSendStatus(), message);
}
} catch (Exception e) {
log.error("核心消息发送异常,消息内容:{}", message, e);
// 核心消息发送失败,可触发重试(避免核心数据丢失)
retrySendCoreMessage(message);
}
}
/**
* 发送非核心消息(订单评价、物流通知)
* @param nonCoreMessage 非核心消息实体
*/
public void sendNonCoreMessage(NonCoreMessage nonCoreMessage) {
// 若开关关闭,直接返回,不发送非核心消息
if (!nonCoreEnable.get()) {
log.warn("非核心生产者已暂停,消息暂不发送:{}", nonCoreMessage);
return;
}
if (ObjectUtils.isEmpty(nonCoreMessage)) {
log.error("非核心消息不能为空");
return;
}
try {
SendResult sendResult = rocketMQTemplate.syncSend(
RocketMQConfig.NON_CORE_ORDER_TOPIC,
JSON.toJSONString(nonCoreMessage),
2000
);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("非核心消息发送成功,消息ID:{}", sendResult.getMsgId());
} else {
log.error("非核心消息发送失败,状态:{},消息内容:{}", sendResult.getSendStatus(), nonCoreMessage);
}
} catch (Exception e) {
log.error("非核心消息发送异常,消息内容:{}", nonCoreMessage, e);
// 非核心消息发送失败,无需重试,避免增加积压压力
}
}
/**
* 核心消息重试发送(最多重试3次)
* @param message 核心消息实体
*/
private void retrySendCoreMessage(Object message) {
int retryCount = 0;
while (retryCount < 3) {
try {
SendResult sendResult = rocketMQTemplate.syncSend(
RocketMQConfig.CORE_ORDER_TOPIC,
JSON.toJSONString(message),
3000
);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("核心消息重试发送成功,重试次数:{},消息ID:{}", retryCount + 1, sendResult.getMsgId());
return;
}
} catch (Exception e) {
log.error("核心消息重试发送异常,重试次数:{},消息内容:{}", retryCount + 1, message, e);
}
retryCount++;
// 重试间隔:100ms,避免频繁重试占用资源
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("核心消息重试间隔异常", e);
break;
}
}
log.error("核心消息重试3次均失败,消息内容:{},请人工处理", message);
}
/**
* 设置非核心生产者开关状态
* @param enable 开关状态
*/
public void setNonCoreEnable(Boolean enable) {
nonCoreEnable.set(enable);
}
/**
* 获取非核心生产者开关状态
* @return 开关状态
*/
public boolean getNonCoreEnable() {
return nonCoreEnable.get();
}
}
2.1.3 关键说明
- 用AtomicBoolean保证非核心生产者开关的线程安全,避免多线程操作导致状态错乱;
- 核心消息发送失败会重试3次(间隔100ms),非核心消息不重试,避免增加积压压力;
- 通过Swagger3接口动态控制,访问http://localhost:8080/swagger-ui/index.html即可操作,无需重启服务;
- 依赖配置(pom.xml,仅展示核心依赖,完整依赖见文末):
<!-- SpringBoot父依赖 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.3</version>
<relativePath/>
</parent>
<!-- JDK版本 -->
<properties>
<java.version>17</java.version>
<rocketmq-spring-boot-starter.version>2.2.3</rocketmq-spring-boot-starter.version>
<mybatis-plus-boot-starter.version>3.5.5.1</mybatis-plus-boot-starter.version>
<fastjson2.version>2.0.39</fastjson2.version>
<lombok.version>1.18.30</lombok.version>
<swagger.version>2.2.0</swagger.version>
</properties>
<!-- 核心依赖 -->
<dependencies>
<!-- SpringBoot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-spring-boot-starter.version}</version>
</dependency>
<!-- FastJSON2 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<!-- MyBatisPlus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus-boot-starter.version}</version>
</dependency>
<!-- MySQL驱动 -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Swagger3 -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>${swagger.version}</version>
</dependency>
</dependencies>
2.2 第二步:临时扩容消费者(最大化提升消费能力)
暂停非核心生产者后,下一步是“提升消费速度”——临时扩容消费者,让更多的“取件人”一起处理积压的消息,这是紧急止血最有效的手段。
2.2.1 扩容的核心原则
- 消费者数量 ≤ 队列分区数:大部分消息队列(Kafka、RocketMQ)的消费模型是“一个分区只能被一个消费者组的一个消费者消费”,如果消费者数量超过分区数,多余的消费者会处于空闲状态,无法提升消费速度;
- 扩容时避免重复消费:通过消息队列的偏移量(offset)机制,确保扩容后的消费者能从正确的位置开始消费,避免重复消费(比如RocketMQ的广播模式需谨慎,优先用集群模式);
- 临时扩容用容器化部署:生产环境中,用Docker+K8s快速扩容消费者实例,积压缓解后再缩容,避免资源浪费。
2.2.2 实例(动态调整消费者线程数,无需重启服务)
基于SpringBoot+RocketMQ,实现消费者线程数的动态调整,结合Swagger3接口,无需重启服务,快速提升消费能力:
package com.jam.demo.consumer.controller;
import com.jam.demo.consumer.service.OrderConsumerService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 消费者控制接口(动态调整消费线程数)
* @author ken
*/
@RestController
@RequestMapping("/consumer/control")
@Slf4j
@RequiredArgsConstructor
@Tag(name = "ConsumerControlController", description = "消费者动态控制接口")
public class ConsumerControlController {
private final OrderConsumerService orderConsumerService;
/**
* 动态调整核心消息消费者线程数
* @param threadNum 线程数(1-50,根据服务器性能调整)
* @return 操作结果
*/
@PostMapping("/core/threadNum")
@Operation(summary = "调整核心消费者线程数", description = "动态调整核心消息(订单创建、支付回调)的消费线程数,提升消费速度")
public String adjustCoreConsumerThreadNum(@RequestParam Integer threadNum) {
if (ObjectUtils.isEmpty(threadNum) || threadNum < 1 || threadNum > 50) {
log.error("线程数参数错误,线程数必须在1-50之间,当前参数:{}", threadNum);
return "参数错误:线程数必须为1-50之间的整数";
}
orderConsumerService.setCoreConsumerThreadNum(threadNum);
log.info("核心消费者线程数已调整为:{}", threadNum);
return "核心消费者线程数调整成功,当前线程数:" + threadNum;
}
/**
* 查看核心消费者线程数
* @return 线程数描述
*/
@PostMapping("/core/threadNum/status")
@Operation(summary = "查看核心消费者线程数", description = "获取当前核心消息消费者的线程数")
public String getCoreConsumerThreadNum() {
int threadNum = orderConsumerService.getCoreConsumerThreadNum();
return "当前核心消费者线程数:" + threadNum;
}
}
package com.jam.demo.consumer.service;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.common.entity.CoreMessage;
import com.jam.demo.common.entity.Order;
import com.jam.demo.consumer.mapper.OrderMapper;
import com.jam.demo.consumer.config.RocketMQConfig;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.ObjectUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 核心消息消费者服务(订单创建、支付回调)
* @author ken
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderConsumerService implements RocketMQListener<String> {
private final OrderMapper orderMapper;
private final PlatformTransactionManager transactionManager;
@Resource
private ThreadPoolTaskExecutor consumerThreadPool;
/**
* 核心消费者线程数(原子类保证线程安全)
*/
private final AtomicInteger coreConsumerThreadNum = new AtomicInteger(10);
/**
* 初始化消费者线程池(默认10线程)
*/
@PostConstruct
public void initThreadPool() {
consumerThreadPool.setCorePoolSize(coreConsumerThreadNum.get());
consumerThreadPool.setMaxPoolSize(coreConsumerThreadNum.get() + 5);
consumerThreadPool.setQueueCapacity(1000);
log.info("核心消费者线程池初始化完成,默认线程数:{}", coreConsumerThreadNum.get());
}
/**
* 消费核心消息(RocketMQ监听方法)
* @param message 消息内容(JSON格式)
*/
@Override
@RocketMQMessageListener(
topic = RocketMQConfig.CORE_ORDER_TOPIC,
consumerGroup = RocketMQConfig.CORE_ORDER_CONSUMER_GROUP,
// 广播模式:所有消费者都消费同一消息;集群模式:消息只被一个消费者消费(优先集群模式,避免重复消费)
messageModel = org.apache.rocketmq.common.message.MessageModel.CLUSTERING,
// 批量消费:每次拉取10条消息,提升消费速度(根据业务调整)
consumeMessageBatchMaxSize = 10,
// 消费超时时间:30秒(避免消费耗时过长导致消息重试)
consumeTimeout = 30
)
public void onMessage(String message) {
if (ObjectUtils.isEmpty(message)) {
log.error("消费核心消息失败:消息内容为空");
return;
}
// 提交到线程池异步消费,提升消费速度
consumerThreadPool.execute(() -> processCoreMessage(message));
}
/**
* 处理核心消息(业务逻辑+编程式事务)
* @param message 消息内容(JSON格式)
*/
private void processCoreMessage(String message) {
// 编程式事务(保证消息消费与数据库操作的一致性)
DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
TransactionStatus transactionStatus = transactionManager.getTransaction(transactionDefinition);
try {
// 解析消息(FastJSON2)
CoreMessage coreMessage = JSON.parseObject(message, CoreMessage.class);
if (ObjectUtils.isEmpty(coreMessage) || ObjectUtils.isEmpty(coreMessage.getOrder())) {
log.error("解析核心消息失败:消息格式错误,消息内容:{}", message);
// 事务回滚
transactionManager.rollback(transactionStatus);
return;
}
Order order = coreMessage.getOrder();
log.info("开始消费核心消息,订单ID:{},消息内容:{}", order.getOrderId(), message);
// 业务逻辑:更新订单状态(模拟核心业务)
LambdaUpdateWrapper<Order> updateWrapper = Wrappers.lambdaUpdate(Order.class)
.eq(Order::getOrderId, order.getOrderId())
.set(Order::getStatus, order.getStatus())
.set(Order::getUpdateTime, System.currentTimeMillis());
int updateCount = orderMapper.update(null, updateWrapper);
if (updateCount == 0) {
log.error("消费核心消息失败:未找到对应订单,订单ID:{}", order.getOrderId());
transactionManager.rollback(transactionStatus);
return;
}
// 事务提交
transactionManager.commit(transactionStatus);
log.info("消费核心消息成功,订单ID:{}", order.getOrderId());
} catch (Exception e) {
log.error("消费核心消息异常,消息内容:{}", message, e);
// 事务回滚
transactionManager.rollback(transactionStatus);
// 消息消费失败,抛出异常,触发RocketMQ重试(重试次数由队列配置决定)
throw new RuntimeException("消费核心消息异常,触发重试", e);
}
}
/**
* 设置核心消费者线程数(动态调整)
* @param threadNum 线程数
*/
public void setCoreConsumerThreadNum(Integer threadNum) {
coreConsumerThreadNum.set(threadNum);
// 调整线程池参数
consumerThreadPool.setCorePoolSize(threadNum);
consumerThreadPool.setMaxPoolSize(threadNum + 5);
}
/**
* 获取核心消费者线程数
* @return 线程数
*/
public int getCoreConsumerThreadNum() {
return coreConsumerThreadNum.get();
}
}
2.2.3 线程池配置(SpringBoot配置类)
package com.jam.demo.consumer.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 消费者线程池配置类
* @author ken
*/
@Configuration
public class ThreadPoolConfig {
/**
* 核心消息消费者线程池
* @return 线程池实例
*/
@Bean
public ThreadPoolTaskExecutor consumerThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 线程名称前缀(便于日志排查)
executor.setThreadNamePrefix("core-consumer-thread-");
// 拒绝策略:丢弃任务并抛出异常(核心消息消费失败需及时感知)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// 线程空闲时间:60秒
executor.setKeepAliveSeconds(60);
// 等待所有任务完成后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间:30秒(超时强制关闭)
executor.setAwaitTerminationSeconds(30);
return executor;
}
}
2.2.4 关键说明
- 用ThreadPoolTaskExecutor创建消费者线程池,动态调整核心线程数,无需重启服务;
- 采用编程式事务(PlatformTransactionManager),保证消息消费与数据库操作的一致性,避免数据不一致;
- RocketMQ监听配置:集群模式(避免重复消费)、批量消费(每次拉取10条)、消费超时30秒,符合生产环境最佳实践;
- 线程池拒绝策略用AbortPolicy(丢弃任务并抛异常),核心消息消费失败需及时感知,避免无声失败;
- 扩容建议:如果队列分区数为20,可将消费者线程数调整为20(最大化利用分区),同时用K8s扩容2-3个消费者实例,进一步提升消费速度。
2.3 第三步:消息分流(避免核心消息被阻塞)
如果积压的消息中,有大量非核心消息(比如日志、通知),会阻塞核心消息的消费——此时需要将核心消息和非核心消息分流,让核心消息优先被消费,非核心消息后续慢慢处理。
2.3.1 分流的核心思路
- 创建临时队列:新建一个临时消息队列(比如CORE_ORDER_TOPIC_TEMP),用于接收核心消息;
- 消息转移:将原队列中的核心消息,批量转移到临时队列,让临时消费者组专门处理;
- 原队列清理:将原队列中的非核心消息暂停消费,或批量删除(如果无需处理),减少积压压力。
2.3.2 实例(RocketMQ消息批量转移)
基于RocketMQ的Admin API,实现消息批量转移,将原队列中的核心消息转移到临时队列,可直接运行:
package com.jam.demo.consumer.service;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.common.entity.CoreMessage;
import com.jam.demo.consumer.config.RocketMQConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.admin.MQAdmin;
import org.apache.rocketmq.client.admin.MQAdminExt;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
import java.util.Set;
/**
* 消息分流服务(核心消息转移到临时队列)
* @author ken
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class MessageShuntService {
@Resource
private MQAdminExt mqAdmin;
private DefaultMQPullConsumer pullConsumer;
private DefaultMQProducer pushProducer;
/**
* 初始化拉取消费者和推送生产者(用于消息转移)
*/
@PostConstruct
public void init() throws MQClientException {
// 初始化拉取消费者(拉取原队列中的消息)
pullConsumer = new DefaultMQPullConsumer("message_shunt_pull_consumer_group");
pullConsumer.setNamesrvAddr(RocketMQConfig.NAMESRV_ADDR);
pullConsumer.start();
log.info("消息分流-拉取消费者初始化完成");
// 初始化推送生产者(将核心消息推送到临时队列)
pushProducer = new DefaultMQProducer("message_shunt_push_producer_group");
pushProducer.setNamesrvAddr(RocketMQConfig.NAMESRV_ADDR);
pushProducer.start();
log.info("消息分流-推送生产者初始化完成");
}
/**
* 消息分流:将原核心队列中的核心消息,转移到临时队列
* @param batchSize 每次拉取的消息数量(批量处理,提升效率)
* @return 转移结果(转移成功数量、失败数量)
*/
public String shuntCoreMessage(int batchSize) {
if (batchSize <= 0 || batchSize > 100) {
log.error("消息分流失败:批量大小错误,必须为1-100之间的整数,当前参数:{}", batchSize);
return "参数错误:批量大小必须为1-100之间的整数";
}
int successCount = 0;
int failCount = 0;
try {
// 1. 获取原核心队列的所有消息队列(分区)
Set<MessageQueue> messageQueues = pullConsumer.fetchSubscribeMessageQueues(RocketMQConfig.CORE_ORDER_TOPIC);
if (CollectionUtils.isEmpty(messageQueues)) {
log.error("消息分流失败:未找到原核心队列的分区");
return "消息分流失败:未找到原核心队列的分区";
}
// 2. 遍历每个分区,拉取消息并转移
for (MessageQueue messageQueue : messageQueues) {
log.info("开始处理分区:{},队列名称:{}", messageQueue.getQueueId(), messageQueue.getTopic());
// 获取分区的最小偏移量(从最开始拉取,避免遗漏)
long offset = pullConsumer.minOffset(messageQueue);
log.info("分区{}的最小偏移量:{}", messageQueue.getQueueId(), offset);
while (true) {
// 批量拉取消息(每次拉取batchSize条)
PullResult pullResult = pullConsumer.pull(
messageQueue,
"*", // 订阅所有tag
offset,
batchSize
);
// 判断拉取状态
if (PullStatus.FOUND.equals(pullResult.getPullStatus())) {
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
if (!CollectionUtils.isEmpty(messageExtList)) {
for (MessageExt messageExt : messageExtList) {
try {
// 解析消息,判断是否为核心消息(这里模拟核心消息包含order字段)
String messageBody = new String(messageExt.getBody(), "UTF-8");
CoreMessage coreMessage = JSON.parseObject(messageBody, CoreMessage.class);
if (!ObjectUtils.isEmpty(coreMessage) && !ObjectUtils.isEmpty(coreMessage.getOrder())) {
// 是核心消息,转移到临时队列
Message tempMessage = new Message(
RocketMQConfig.CORE_ORDER_TOPIC_TEMP, // 临时队列topic
messageExt.getTags(),
messageExt.getKeys(),
messageBody.getBytes("UTF-8")
);
// 发送到临时队列
SendResult sendResult = pushProducer.send(tempMessage, 3000);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
successCount++;
log.info("消息转移成功,消息ID:{},分区:{}", sendResult.getMsgId(), messageQueue.getQueueId());
} else {
failCount++;
log.error("消息转移失败,消息ID:{},发送状态:{}", messageExt.getMsgId(), sendResult.getSendStatus());
}
} else {
// 非核心消息,跳过(可根据业务需求删除或暂存)
log.warn("非核心消息,跳过转移,消息内容:{}", messageBody);
}
} catch (Exception e) {
failCount++;
log.error("消息转移异常,消息ID:{}", messageExt.getMsgId(), e);
}
}
}
// 更新偏移量,继续拉取下一批
offset = pullResult.getNextBeginOffset();
} else if (PullStatus.NO_NEW_MSG.equals(pullResult.getPullStatus())) {
// 该分区没有新消息,退出循环
log.info("分区{}没有新消息,退出处理", messageQueue.getQueueId());
break;
} else if (PullStatus.OFFSET_ILLEGAL.equals(pullResult.getPullStatus())) {
// 偏移量非法,重置为最小偏移量
offset = pullConsumer.minOffset(messageQueue);
log.warn("分区{}偏移量非法,重置为最小偏移量:{}", messageQueue.getQueueId(), offset);
}
}
}
return String.format("消息分流完成,转移成功:%d条,转移失败:%d条", successCount, failCount);
} catch (Exception e) {
log.error("消息分流整体异常", e);
return String.format("消息分流异常,已转移成功:%d条,失败:%d条,异常信息:%s", successCount, failCount, e.getMessage());
}
}
/**
* 关闭拉取消费者和推送生产者(用于服务停止时释放资源)
*/
public void close() {
if (pullConsumer != null) {
pullConsumer.shutdown();
log.info("消息分流-拉取消费者已关闭");
}
if (pushProducer != null) {
pushProducer.shutdown();
log.info("消息分流-推送生产者已关闭");
}
}
}
2.3.3 临时队列消费者配置(专门消费转移后的核心消息)
package com.jam.demo.consumer.service;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.common.entity.CoreMessage;
import com.jam.demo.common.entity.Order;
import com.jam.demo.consumer.mapper.OrderMapper;
import com.jam.demo.consumer.config.RocketMQConfig;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.ObjectUtils;
/**
* 临时队列消费者服务(消费转移后的核心消息)
* @author ken
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class TempCoreMessageConsumer implements RocketMQListener<String> {
private final OrderMapper orderMapper;
private final PlatformTransactionManager transactionManager;
/**
* 消费临时队列中的核心消息
* @param message 消息内容(JSON格式)
*/
@Override
@RocketMQMessageListener(
topic = RocketMQConfig.CORE_ORDER_TOPIC_TEMP,
consumerGroup = RocketMQConfig.CORE_ORDER_TEMP_CONSUMER_GROUP,
messageModel = org.apache.rocketmq.common.message.MessageModel.CLUSTERING,
consumeMessageBatchMaxSize = 15, // 临时队列,批量消费数量增加到15,加快消费
consumeTimeout = 20
)
public void onMessage(String message) {
if (ObjectUtils.isEmpty(message)) {
log.error("消费临时队列消息失败:消息内容为空");
return;
}
// 编程式事务
DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
TransactionStatus transactionStatus = transactionManager.getTransaction(transactionDefinition);
try {
CoreMessage coreMessage = JSON.parseObject(message, CoreMessage.class);
if (ObjectUtils.isEmpty(coreMessage) || ObjectUtils.isEmpty(coreMessage.getOrder())) {
log.error("解析临时队列消息失败:消息格式错误,消息内容:{}", message);
transactionManager.rollback(transactionStatus);
return;
}
Order order = coreMessage.getOrder();
log.info("开始消费临时队列核心消息,订单ID:{}", order.getOrderId());
// 业务逻辑:更新订单状态(与核心消费者一致)
LambdaUpdateWrapper<Order> updateWrapper = Wrappers.lambdaUpdate(Order.class)
.eq(Order::getOrderId, order.getOrderId())
.set(Order::getStatus, order.getStatus())
.set(Order::getUpdateTime, System.currentTimeMillis());
int updateCount = orderMapper.update(null, updateWrapper);
if (updateCount == 0) {
log.error("消费临时队列消息失败:未找到对应订单,订单ID:{}", order.getOrderId());
transactionManager.rollback(transactionStatus);
return;
}
transactionManager.commit(transactionStatus);
log.info("消费临时队列核心消息成功,订单ID:{}", order.getOrderId());
} catch (Exception e) {
log.error("消费临时队列消息异常,消息内容:{}", message, e);
transactionManager.rollback(transactionStatus);
throw new RuntimeException("消费临时队列消息异常,触发重试", e);
}
}
}
2.3.4 关键说明
- 用MQAdminExt、DefaultMQPullConsumer拉取原队列消息,DefaultMQProducer将核心消息推送到临时队列,实现消息分流;
- 批量拉取消息(每次1-100条),提升分流效率,避免单条拉取耗时过长;
- 临时队列的批量消费数量设置为15(高于原队列的10),加快核心消息的消费速度;
- 非核心消息可根据业务需求,选择跳过、删除或暂存到其他队列,避免阻塞核心消息。
2.4 第四步:跳过无效消息(减少无效消费)
如果积压的消息中,有大量无效消息(比如重复消息、过期消息、格式错误消息),这些消息会占用消费资源,导致有效消息消费变慢——此时需要跳过无效消息,让消费者只处理有效消息。
2.4.1 无效消息的判断标准(根据业务定义)
- 过期消息:消息的创建时间超过业务允许的有效期(比如订单消息超过24小时,无需处理);
- 重复消息:消息ID重复(通过Redis或数据库记录已消费的消息ID,避免重复处理);
- 格式错误消息:消息JSON解析失败,或缺少核心字段(比如订单消息缺少订单ID)。
2.4.2 实例(跳过无效消息,结合Redis去重)
基于Redis(最新稳定版7.2.4),实现消息去重和无效消息过滤,可直接编译运行:
package com.jam.demo.consumer.service;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.common.entity.CoreMessage;
import com.jam.demo.common.entity.Order;
import com.jam.demo.consumer.mapper.OrderMapper;
import com.jam.demo.consumer.config.RocketMQConfig;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.ObjectUtils;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
* 核心消息消费者(带无效消息过滤+Redis去重)
* @author ken
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class FilteredOrderConsumerService implements RocketMQListener<String> {
private final OrderMapper orderMapper;
private final PlatformTransactionManager transactionManager;
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* Redis key前缀(已消费消息ID)
*/
private static final String CONSUMED_MESSAGE_KEY_PREFIX = "consumer:core:message:consumed:";
/**
* 消息有效期(24小时,单位:毫秒)
*/
private static final long MESSAGE_VALID_TIME = 24 * 60 * 60 * 1000;
/**
* 消费核心消息(带过滤无效消息)
* @param message 消息内容(JSON格式)
*/
@Override
@RocketMQMessageListener(
topic = RocketMQConfig.CORE_ORDER_TOPIC,
consumerGroup = RocketMQConfig.FILTERED_CORE_ORDER_CONSUMER_GROUP,
messageModel = org.apache.rocketmq.common.message.MessageModel.CLUSTERING,
consumeMessageBatchMaxSize = 12,
consumeTimeout = 25
)
public void onMessage(String message) {
if (ObjectUtils.isEmpty(message)) {
log.error("消费核心消息失败:消息内容为空(无效消息)");
return;
}
try {
// 1. 解析消息基本信息
CoreMessage coreMessage = JSON.parseObject(message, CoreMessage.class);
if (ObjectUtils.isEmpty(coreMessage) || ObjectUtils.isEmpty(coreMessage.getOrder()) || ObjectUtils.isEmpty(coreMessage.getMsgId())) {
log.error("消费核心消息失败:消息格式错误(无效消息),消息内容:{}", message);
return;
}
String msgId = coreMessage.getMsgId();
Order order = coreMessage.getOrder();
long messageCreateTime = coreMessage.getCreateTime();
// 2. 过滤无效消息
// 2.1 过滤过期消息(创建时间超过24小时)
if (System.currentTimeMillis() - messageCreateTime > MESSAGE_VALID_TIME) {
log.warn("跳过过期消息,消息ID:{},订单ID:{},创建时间:{}", msgId, order.getOrderId(), messageCreateTime);
return;
}
// 2.2 过滤重复消息(Redis去重,过期时间24小时)
String redisKey = CONSUMED_MESSAGE_KEY_PREFIX + msgId;
Boolean isConsumed = stringRedisTemplate.hasKey(redisKey);
if (Boolean.TRUE.equals(isConsumed)) {
log.warn("跳过重复消息,消息ID:{},订单ID:{}", msgId, order.getOrderId());
return;
}
// 3. 有效消息,开始处理(编程式事务)
DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
TransactionStatus transactionStatus = transactionManager.getTransaction(transactionDefinition);
try {
log.info("开始消费有效核心消息,消息ID:{},订单ID:{}", msgId, order.getOrderId());
// 业务逻辑:更新订单状态
LambdaUpdateWrapper<Order> updateWrapper = Wrappers.lambdaUpdate(Order.class)
.eq(Order::getOrderId, order.getOrderId())
.set(Order::getStatus, order.getStatus())
.set(Order::getUpdateTime, System.currentTimeMillis());
int updateCount = orderMapper.update(null, updateWrapper);
if (updateCount == 0) {
log.error("消费有效核心消息失败:未找到对应订单,订单ID:{}", order.getOrderId());
transactionManager.rollback(transactionStatus);
return;
}
// 事务提交
transactionManager.commit(transactionStatus);
log.info("消费有效核心消息成功,订单ID:{}", order.getOrderId());
// 记录已消费消息ID到Redis,设置24小时过期(避免重复消费)
stringRedisTemplate.opsForValue().set(redisKey, "1", 24, TimeUnit.HOURS);
} catch (Exception e) {
log.error("消费有效核心消息异常,消息ID:{},订单ID:{}", msgId, order.getOrderId(), e);
transactionManager.rollback(transactionStatus);
throw new RuntimeException("消费有效核心消息异常,触发重试", e);
}
} catch (Exception e) {
log.error("过滤/消费核心消息整体异常,消息内容:{}", message, e);
}
}
}
2.4.3 关键说明
- Redis去重逻辑:用消息唯一ID(msgId)作为Redis Key,值为固定标识,过期时间24小时(与消息有效期一致),避免永久占用Redis内存;
- 无效消息过滤优先级:先判断消息是否为空→再解析格式→再过滤过期→最后过滤重复,层层筛选,只处理有效消息;
- 异常隔离:过滤阶段的异常不触发消息重试(比如格式错误、过期),只有有效消息处理失败才触发重试,避免无效消息反复重试占用消费资源;
- Redis依赖配置(pom.xml):
<!-- Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Redis连接池(提升性能) -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
2.5 紧急止血效果验证(必做,确认积压缓解)
紧急止血操作完成后,需通过3个维度验证效果,避免“假缓解”:
- 队列监控维度:查看消息队列的积压数量(Offset差值)是否持续下降,生产TPS ≤ 消费TPS;
- 消费端维度:查看消费者线程池利用率(80%左右为宜)、消费成功率(≥99%)、无大量重试日志;
- 业务维度:下游业务(如订单状态更新、库存扣减)恢复正常,无数据不一致问题。
三、根源排查:找到积压的“真凶”(避免重复踩坑)
紧急止血只是“治标”,只有找到积压的根本原因,才能“治本”。结合前文的排查流程图,从消费端、生产端、队列配置三个维度逐一排查,每个维度都有明确的排查方法和验证手段。
3.1 消费端问题排查(占比80%,最常见)
消费端是积压的最主要原因,核心排查方向:
3.1.1 消费服务是否存活
- 排查方法:通过K8s/Docker查看消费者实例是否运行、端口是否存活、有无OOM/Kill日志;
- 验证命令(Linux):
# 查看消费者进程
ps -ef | grep core-consumer
# 查看端口占用
netstat -tulpn | grep 8081
# 查看OOM日志
dmesg | grep -i oom | grep core-consumer
- 解决方案:重启挂掉的实例、扩容实例、调整JVM参数(如-Xmx4g -Xms4g)避免OOM。
3.1.2 消费线程池是否阻塞
- 排查方法:通过Arthas(阿里开源诊断工具)查看线程池状态(活跃线程数、队列长度、拒绝次数);
- 验证命令(Arthas):
# 启动Arthas
java -jar arthas-boot.jar
# 选择消费者进程
# 查看线程池信息
thread -n 10 # 查看最繁忙的10个线程
# 查看线程池详细参数
ognl '@com.jam.demo.consumer.config.ThreadPoolConfig@consumerThreadPool.getCorePoolSize()'
ognl '@com.jam.demo.consumer.config.ThreadPoolConfig@consumerThreadPool.getActiveCount()'
- 解决方案:动态扩容线程池、优化阻塞的业务逻辑(如慢SQL、远程调用超时)。
3.1.3 业务逻辑耗时过长
- 排查方法:通过日志/链路追踪(SkyWalking)查看消费方法的耗时,定位慢操作(如慢SQL、第三方接口超时);
- 慢SQL排查(MySQL8.0):
-- 开启慢查询日志
SET GLOBAL slow_query_log = ON;
SET GLOBAL long_query_time = 1; -- 耗时1秒以上的SQL记录
-- 查看慢查询日志
SELECT * FROM mysql.slow_log ORDER BY start_time DESC LIMIT 10;
-- 分析SQL执行计划
EXPLAIN UPDATE `order` SET status = 2, update_time = 1740000000000 WHERE order_id = '123456';
- 解决方案:优化慢SQL(加索引、分库分表)、设置远程调用超时(如Feign设置1秒超时)、异步处理非核心业务。
3.2 生产端问题排查(占比15%)
生产端问题主要是“生产突增”或“重复生产”:
3.2.1 生产TPS突增
- 排查方法:查看生产者监控(如Prometheus+Grafana),确认是否有大促、秒杀等峰值流量;
- 解决方案:限流(如Sentinel)、削峰(如生产者端增加本地队列)、降级非核心生产业务。
3.2.2 重复生产
- 排查方法:查看生产者日志,确认是否有“发送失败-重试”循环,或消息ID重复;
- 解决方案:生产者端增加幂等性(如基于订单ID去重)、设置合理的重试次数(如3次)、避免无限重试。
3.3 队列配置问题排查(占比5%)
队列配置问题容易被忽略,但会导致“扩容无效”:
3.3.1 分区数不足
- 排查方法:查看队列分区数(如RocketMQ):
# RocketMQ查看Topic分区数
sh mqadmin topicStatus -n 127.0.0.1:9876 -t CORE_ORDER_TOPIC
- 解决方案:扩容分区数(注意:RocketMQ/Kafka分区数扩容后,需重新分配消费者)。
3.3.2 拉取策略不合理
- 排查方法:查看消费者拉取配置(如批量拉取数、拉取间隔);
- 解决方案:调整批量拉取数(如从10增加到20)、缩短拉取间隔(如从500ms缩短到200ms)。
四、彻底解决:从架构层面避免积压(治本)
找到根源后,从架构、配置、代码三个层面优化,彻底避免百万消息积压问题,核心思路是“提升消费能力、控制生产速度、增加容错机制”。
4.1 架构层面优化
4.1.1 消费者集群化+弹性伸缩
- 方案:基于K8s实现消费者的弹性伸缩,根据积压数量自动扩容/缩容;
- 配置示例(K8s HPA):
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: core-consumer-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: core-consumer
minReplicas: 2
maxReplicas: 10
metrics:
- type: External
external:
metric:
name: rocketmq_topic_backlog
selector:
matchLabels:
topic: CORE_ORDER_TOPIC
target:
type: Value
value: 10000 # 积压数超过1万,自动扩容
4.1.2 消息分级+优先级消费
- 方案:将消息分为核心(订单、支付)、普通(物流)、低优先级(日志),不同优先级消息使用不同队列,核心队列配置更多分区和消费者;
- 架构图:
4.2 配置层面优化
4.2.1 合理设置队列参数
| 参数 | 推荐值(RocketMQ) | 说明 |
| 批量消费数 | 10-20 | 提升消费效率,避免单条消费 |
| 消费超时时间 | 20-30秒 | 避免消费耗时过长导致重试 |
| 重试次数 | 3次 | 避免无限重试增加积压 |
| 死信队列开启 | 是 | 失败消息进入死信,避免阻塞 |
4.2.2 消费者线程池配置
- 核心线程数 = CPU核心数 * 2(如8核CPU,核心线程数16);
- 最大线程数 = 核心线程数 + 5;
- 队列容量 = 1000(避免队列过长导致阻塞);
- 拒绝策略 = AbortPolicy(核心消息)/DiscardOldestPolicy(非核心消息)。
4.3 代码层面优化
4.3.1 消费业务异步化
将消费中的非核心业务(如日志、通知)异步处理,减少主流程耗时:
/**
* 异步处理非核心业务(日志记录)
* @author ken
*/
private void asyncProcessNonCoreBusiness(Order order) {
// 异步线程池(单独配置,不占用核心消费线程)
nonCoreThreadPool.execute(() -> {
try {
logService.recordOrderLog(order.getOrderId(), order.getStatus());
notificationService.sendOrderNotify(order.getUserId(), order.getOrderId());
} catch (Exception e) {
log.error("异步处理非核心业务失败,订单ID:{}", order.getOrderId(), e);
}
});
}
4.3.2 消费幂等性保障
基于MySQL+Redis实现双重幂等,避免重复消费导致数据不一致:
/**
* 检查消费幂等性(MySQL+Redis)
* @param msgId 消息ID
* @param orderId 订单ID
* @return true-可消费,false-重复消费
* @author ken
*/
private boolean checkIdempotent(String msgId, String orderId) {
// 1. Redis快速判断(第一层)
String redisKey = CONSUMED_MESSAGE_KEY_PREFIX + msgId;
if (Boolean.TRUE.equals(stringRedisTemplate.hasKey(redisKey))) {
return false;
}
// 2. MySQL数据库判断(第二层,防Redis宕机)
int count = orderMapper.countConsumedMsg(msgId, orderId);
if (count > 0) {
return stringRedisTemplate.opsForValue().set(redisKey, "1", 24, TimeUnit.HOURS); // 同步到Redis
}
return true;
}
对应的MyBatisPlus Mapper:
package com.jam.demo.consumer.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.common.entity.Order;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;
/**
* 订单Mapper
* @author ken
*/
@Repository
public interface OrderMapper extends BaseMapper<Order> {
/**
* 检查消息是否已消费(幂等性)
* @param msgId 消息ID
* @param orderId 订单ID
* @return 已消费次数
*/
int countConsumedMsg(@Param("msgId") String msgId, @Param("orderId") String orderId);
}
对应的Mapper XML(resources/mapper/OrderMapper.xml):
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.jam.demo.consumer.mapper.OrderMapper">
<select id="countConsumedMsg" resultType="java.lang.Integer">
SELECT COUNT(1) FROM `order_consume_record`
WHERE msg_id = #{msgId} AND order_id = #{orderId}
</select>
</mapper>
对应的MySQL表结构(MySQL8.0):
CREATE TABLE `order_consume_record` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`msg_id` varchar(64) NOT NULL COMMENT '消息唯一ID',
`order_id` varchar(64) NOT NULL COMMENT '订单ID',
`consume_time` bigint NOT NULL COMMENT '消费时间(毫秒)',
`consumer_ip` varchar(32) NOT NULL COMMENT '消费者IP',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_msg_order` (`msg_id`,`order_id`) COMMENT '消息ID+订单ID唯一索引(幂等性)',
KEY `idx_order_id` (`order_id`) COMMENT '订单ID索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单消费记录(幂等性)';
五、复盘优化:建立预防机制
解决积压问题后,必须进行复盘,建立“监控-预警-应急”三位一体的预防机制,让积压问题“早发现、早处理、不扩大”。
5.1 完善监控体系
核心监控指标(需配置可视化面板,如Grafana):
| 维度 | 核心指标 | 预警阈值 |
| 队列 | 积压数量、生产TPS、消费TPS | 积压>1万触发预警 |
| 消费端 | 消费成功率、消费耗时、线程池利用率 | 成功率<99%、耗时>2秒 |
| 生产端 | 生产成功率、重试次数 | 重试次数>100次/分钟 |
| 系统 | JVM内存、CPU利用率、磁盘使用率 | CPU>80%、内存>85% |
5.2 配置分级预警
通过钉钉/企业微信配置分级预警,避免信息过载:
- 一级预警(短信+电话):积压>10万、消费TPS=0、服务宕机;
- 二级预警(钉钉群):积压>1万、消费成功率<99%、耗时>2秒;
- 三级预警(日志):积压>5000、生产重试次数增加。
5.3 制定应急手册
将本文的紧急止血流程整理成标准化应急手册,包含:
- 应急联系人(开发、运维、DBA);
- 操作步骤(暂停非核心生产→扩容消费者→消息分流→跳过无效消息);
- 验证方法(监控指标、业务验证);
- 回滚方案(扩容后缩容、恢复非核心生产)。
总结
- 紧急止血核心:暂停非核心生产减少压力源,扩容消费者+消息分流提升消费能力,跳过无效消息减少资源浪费,30分钟内可快速缓解百万消息积压;
- 根源排查重点:80%的积压源于消费端(服务挂掉、线程池阻塞、慢业务),需通过Arthas、慢查询日志、队列监控逐一定位;
- 长期优化关键:架构上实现消费者弹性伸缩+消息分级消费,代码上保障幂等性+异步化,运营上建立监控-预警-应急体系,从“治标”到“治本”,彻底避免积压问题。