百万消息积压 4 小时,我靠这套方案快速止血

简介: 本文针对分布式系统中百万级消息积压问题,提出了一套完整的解决方案。首先分析了消息积压的本质是生产速度超过消费速度,并阐述了积压的危害。随后详细介绍了"紧急止血→根源排查→彻底解决→复盘优化"的四步处理流程:通过暂停非核心生产者、扩容消费者、消息分流和跳过无效消息快速缓解积压;从消费端、生产端和队列配置三个维度排查根本原因;从架构、配置和代码层面提出长期优化方案;最后强调建立监控预警体系的重要性。文章提供了大量生产环境验证的代码示例和技术方案,帮助开发者系统性地解决消息积压问题。

在分布式系统中,消息队列就像“交通枢纽”,承接生产者的消息、调度消费者的消费节奏,是解耦、削峰、异步通信的核心组件。但一旦出现“百万消息积压几小时”的问题,就相当于交通枢纽彻底瘫痪——下游业务无法获取消息、数据一致性被破坏、甚至引发连锁故障,直接影响用户体验和业务连续性。

作为常年和分布式架构打交道的开发者,我曾在生产环境中多次处理过消息积压问题,小到几万条消息的短暂阻塞,大到百万级消息积压4小时的紧急故障,总结出了一套“紧急止血→根源排查→彻底解决→复盘优化”的全流程方案。

一、先搞懂:消息积压的核心本质

很多开发者遇到积压就慌,盲目扩容消费者、重启服务,结果越搞越乱——其实消息积压的本质很简单,用一句话就能说透:消息生产速度 ≥ 消息消费速度,且积压量超过了消息队列的缓冲能力,导致消息在队列中持续堆积

类比一下:消息队列就像小区的快递柜,生产者是快递员,消费者是取快递的业主。正常情况下,快递员送快递的速度(生产速度),和业主取快递的速度(消费速度)基本匹配,快递柜不会满;但如果快递员突然批量送百万个包裹(生产者突增),或者业主都在家不出来取件(消费者消费慢/挂掉),快递柜很快就会被堆满,后续的快递只能排队等待,这就是“消息积压”。

1.1 积压的3个核心前提

  1. 生产速度 > 消费速度:这是最常见的原因,比如大促期间,订单生产者每秒产生1000条消息,而消费者每秒只能处理100条,差值会持续累积,最终导致积压;
  2. 消费端故障:消费者服务挂掉、线程池阻塞、数据库宕机,导致消费完全停止,即使生产速度正常,消息也会持续堆积;
  3. 队列配置不合理:队列分区过少、消息拉取策略不当、死信队列未处理,导致消费端无法充分利用资源,即使消费者正常,也无法高效消费消息。

1.2 积压的底层影响

很多人觉得“积压几小时没事”,其实积压的危害会持续放大,甚至引发级联故障:

  • 消息超时:大部分消息队列(RocketMQ、Kafka)的消息都有超时时间,积压过久会导致消息过期,被丢弃或进入死信队列,引发业务数据丢失;
  • 队列撑满:消息队列的存储容量有限(比如磁盘满),积压过久会导致队列无法接收新消息,生产者报错,上游业务瘫痪;
  • 消费雪崩:积压的消息过多时,若消费者突然恢复,会一次性拉取大量消息,导致消费者线程池满、CPU飙升、服务宕机,积压问题进一步恶化;
  • 数据不一致:比如订单消息积压,下游库存服务无法及时扣减库存,可能导致超卖、漏卖,后续需要大量人力复盘对账。

1.3 积压排查流程图(必用,快速定位原因)

遇到积压问题,先不要急着解决,先排查原因——用下面这个流程图,3分钟就能定位到积压的核心原因,避免盲目操作:

二、紧急止血:30分钟内快速缓解积压(优先保业务)

当百万消息积压几小时,核心诉求是“快速缓解,避免业务进一步恶化”——这一步的核心思路是:暂时切断非核心压力,最大化提升消费能力,快速消化积压消息,相当于“先找临时快递员,把堆积的快递先拉走一部分,缓解快递柜压力”。

紧急止血的操作优先级:暂停非核心生产者 → 临时扩容消费者 → 消息分流 → 跳过无效消息,四步走,30分钟内就能让积压量快速下降,具体操作如下(附实例)。

2.1 第一步:暂停非核心生产者(减少压力源)

积压的核心是“生产>消费”,所以第一步要做的是“减少生产”——暂停非核心业务的生产者,只保留核心业务的消息生产,避免积压量持续增加。

2.1.1 操作场景

比如电商系统中,百万订单消息积压,此时可以暂停“订单评价、物流通知”等非核心业务的生产者,只保留“订单创建、支付回调”等核心业务的生产者,让消费端集中精力处理核心消息。

2.1.2 实例

基于JDK17、SpringBoot3.2.3、Swagger3,实现生产者的动态启停,通过接口控制,无需重启服务,符合生产环境使用规范:

package com.jam.demo.producer.controller;

import com.jam.demo.producer.service.OrderProducerService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
* 生产者控制接口(用于动态启停非核心生产者)
* @author ken
*/

@RestController
@RequestMapping("/producer/control")
@Slf4j
@RequiredArgsConstructor
@Tag(name = "ProducerControlController", description = "生产者动态控制接口")
public class ProducerControlController {

   private final OrderProducerService orderProducerService;

   /**
    * 启停非核心生产者(如订单评价、物流通知)
    * @param enable true-启动,false-暂停
    * @return 操作结果
    */

   @PostMapping("/nonCore/enable")
   @Operation(summary = "启停非核心生产者", description = "控制非核心业务的消息生产,缓解积压")
   public String enableNonCoreProducer(@RequestParam Boolean enable) {
       if (ObjectUtils.isEmpty(enable)) {
           log.error("启停参数不能为空");
           return "参数错误:enable不能为空";
       }
       orderProducerService.setNonCoreEnable(enable);
       String result = enable ? "非核心生产者已启动" : "非核心生产者已暂停";
       log.info(result);
       return result;
   }

   /**
    * 查看非核心生产者状态
    * @return 状态描述
    */

   @PostMapping("/nonCore/status")
   @Operation(summary = "查看非核心生产者状态", description = "获取当前非核心生产者的启停状态")
   public String getNonCoreProducerStatus() {
       boolean enable = orderProducerService.getNonCoreEnable();
       return enable ? "非核心生产者正在运行" : "非核心生产者已暂停";
   }
}

package com.jam.demo.producer.service;

import com.alibaba.fastjson2.JSON;
import com.jam.demo.common.entity.NonCoreMessage;
import com.jam.demo.producer.config.RocketMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;

import javax.annotation.Resource;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* 订单生产者服务(核心+非核心)
* @author ken
*/

@Service
@Slf4j
public class OrderProducerService {

   @Resource
   private RocketMQTemplate rocketMQTemplate;

   /**
    * 非核心生产者开关(原子类保证线程安全)
    */

   private final AtomicBoolean nonCoreEnable = new AtomicBoolean(true);

   /**
    * 发送核心消息(订单创建、支付回调)
    * @param message 核心消息实体
    */

   public void sendCoreMessage(Object message) {
       if (ObjectUtils.isEmpty(message)) {
           log.error("核心消息不能为空");
           return;
       }
       try {
           SendResult sendResult = rocketMQTemplate.syncSend(
                   RocketMQConfig.CORE_ORDER_TOPIC,
                   JSON.toJSONString(message),
                   3000
           );
           if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
               log.info("核心消息发送成功,消息ID:{}", sendResult.getMsgId());
           } else {
               log.error("核心消息发送失败,状态:{},消息内容:{}", sendResult.getSendStatus(), message);
           }
       } catch (Exception e) {
           log.error("核心消息发送异常,消息内容:{}", message, e);
           // 核心消息发送失败,可触发重试(避免核心数据丢失)
           retrySendCoreMessage(message);
       }
   }

   /**
    * 发送非核心消息(订单评价、物流通知)
    * @param nonCoreMessage 非核心消息实体
    */

   public void sendNonCoreMessage(NonCoreMessage nonCoreMessage) {
       // 若开关关闭,直接返回,不发送非核心消息
       if (!nonCoreEnable.get()) {
           log.warn("非核心生产者已暂停,消息暂不发送:{}", nonCoreMessage);
           return;
       }
       if (ObjectUtils.isEmpty(nonCoreMessage)) {
           log.error("非核心消息不能为空");
           return;
       }
       try {
           SendResult sendResult = rocketMQTemplate.syncSend(
                   RocketMQConfig.NON_CORE_ORDER_TOPIC,
                   JSON.toJSONString(nonCoreMessage),
                   2000
           );
           if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
               log.info("非核心消息发送成功,消息ID:{}", sendResult.getMsgId());
           } else {
               log.error("非核心消息发送失败,状态:{},消息内容:{}", sendResult.getSendStatus(), nonCoreMessage);
           }
       } catch (Exception e) {
           log.error("非核心消息发送异常,消息内容:{}", nonCoreMessage, e);
           // 非核心消息发送失败,无需重试,避免增加积压压力
       }
   }

   /**
    * 核心消息重试发送(最多重试3次)
    * @param message 核心消息实体
    */

   private void retrySendCoreMessage(Object message) {
       int retryCount = 0;
       while (retryCount < 3) {
           try {
               SendResult sendResult = rocketMQTemplate.syncSend(
                       RocketMQConfig.CORE_ORDER_TOPIC,
                       JSON.toJSONString(message),
                       3000
               );
               if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
                   log.info("核心消息重试发送成功,重试次数:{},消息ID:{}", retryCount + 1, sendResult.getMsgId());
                   return;
               }
           } catch (Exception e) {
               log.error("核心消息重试发送异常,重试次数:{},消息内容:{}", retryCount + 1, message, e);
           }
           retryCount++;
           // 重试间隔:100ms,避免频繁重试占用资源
           try {
               Thread.sleep(100);
           } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
               log.error("核心消息重试间隔异常", e);
               break;
           }
       }
       log.error("核心消息重试3次均失败,消息内容:{},请人工处理", message);
   }

   /**
    * 设置非核心生产者开关状态
    * @param enable 开关状态
    */

   public void setNonCoreEnable(Boolean enable) {
       nonCoreEnable.set(enable);
   }

   /**
    * 获取非核心生产者开关状态
    * @return 开关状态
    */

   public boolean getNonCoreEnable() {
       return nonCoreEnable.get();
   }
}

2.1.3 关键说明

  • 用AtomicBoolean保证非核心生产者开关的线程安全,避免多线程操作导致状态错乱;
  • 核心消息发送失败会重试3次(间隔100ms),非核心消息不重试,避免增加积压压力;
  • 通过Swagger3接口动态控制,访问http://localhost:8080/swagger-ui/index.html即可操作,无需重启服务;
  • 依赖配置(pom.xml,仅展示核心依赖,完整依赖见文末):

<!-- SpringBoot父依赖 -->
<parent>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-parent</artifactId>
   <version>3.2.3</version>
   <relativePath/>
</parent>

<!-- JDK版本 -->
<properties>
   <java.version>17</java.version>
   <rocketmq-spring-boot-starter.version>2.2.3</rocketmq-spring-boot-starter.version>
   <mybatis-plus-boot-starter.version>3.5.5.1</mybatis-plus-boot-starter.version>
   <fastjson2.version>2.0.39</fastjson2.version>
   <lombok.version>1.18.30</lombok.version>
   <swagger.version>2.2.0</swagger.version>
</properties>

<!-- 核心依赖 -->
<dependencies>
   <!-- SpringBoot Web -->
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
   </dependency>

   <!-- Lombok -->
   <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
       <version>${lombok.version}</version>
       <scope>provided</scope>
   </dependency>

   <!-- RocketMQ -->
   <dependency>
       <groupId>org.apache.rocketmq</groupId>
       <artifactId>rocketmq-spring-boot-starter</artifactId>
       <version>${rocketmq-spring-boot-starter.version}</version>
   </dependency>

   <!-- FastJSON2 -->
   <dependency>
       <groupId>com.alibaba.fastjson2</groupId>
       <artifactId>fastjson2</artifactId>
       <version>${fastjson2.version}</version>
   </dependency>

   <!-- MyBatisPlus -->
   <dependency>
       <groupId>com.baomidou</groupId>
       <artifactId>mybatis-plus-boot-starter</artifactId>
       <version>${mybatis-plus-boot-starter.version}</version>
   </dependency>

   <!-- MySQL驱动 -->
   <dependency>
       <groupId>com.mysql</groupId>
       <artifactId>mysql-connector-j</artifactId>
       <scope>runtime</scope>
   </dependency>

   <!-- Swagger3 -->
   <dependency>
       <groupId>org.springdoc</groupId>
       <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
       <version>${swagger.version}</version>
   </dependency>
</dependencies>

2.2 第二步:临时扩容消费者(最大化提升消费能力)

暂停非核心生产者后,下一步是“提升消费速度”——临时扩容消费者,让更多的“取件人”一起处理积压的消息,这是紧急止血最有效的手段。

2.2.1 扩容的核心原则

  1. 消费者数量 ≤ 队列分区数:大部分消息队列(Kafka、RocketMQ)的消费模型是“一个分区只能被一个消费者组的一个消费者消费”,如果消费者数量超过分区数,多余的消费者会处于空闲状态,无法提升消费速度;
  2. 扩容时避免重复消费:通过消息队列的偏移量(offset)机制,确保扩容后的消费者能从正确的位置开始消费,避免重复消费(比如RocketMQ的广播模式需谨慎,优先用集群模式);
  3. 临时扩容用容器化部署:生产环境中,用Docker+K8s快速扩容消费者实例,积压缓解后再缩容,避免资源浪费。

2.2.2 实例(动态调整消费者线程数,无需重启服务)

基于SpringBoot+RocketMQ,实现消费者线程数的动态调整,结合Swagger3接口,无需重启服务,快速提升消费能力:

package com.jam.demo.consumer.controller;

import com.jam.demo.consumer.service.OrderConsumerService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
* 消费者控制接口(动态调整消费线程数)
* @author ken
*/

@RestController
@RequestMapping("/consumer/control")
@Slf4j
@RequiredArgsConstructor
@Tag(name = "ConsumerControlController", description = "消费者动态控制接口")
public class ConsumerControlController {

   private final OrderConsumerService orderConsumerService;

   /**
    * 动态调整核心消息消费者线程数
    * @param threadNum 线程数(1-50,根据服务器性能调整)
    * @return 操作结果
    */

   @PostMapping("/core/threadNum")
   @Operation(summary = "调整核心消费者线程数", description = "动态调整核心消息(订单创建、支付回调)的消费线程数,提升消费速度")
   public String adjustCoreConsumerThreadNum(@RequestParam Integer threadNum) {
       if (ObjectUtils.isEmpty(threadNum) || threadNum < 1 || threadNum > 50) {
           log.error("线程数参数错误,线程数必须在1-50之间,当前参数:{}", threadNum);
           return "参数错误:线程数必须为1-50之间的整数";
       }
       orderConsumerService.setCoreConsumerThreadNum(threadNum);
       log.info("核心消费者线程数已调整为:{}", threadNum);
       return "核心消费者线程数调整成功,当前线程数:" + threadNum;
   }

   /**
    * 查看核心消费者线程数
    * @return 线程数描述
    */

   @PostMapping("/core/threadNum/status")
   @Operation(summary = "查看核心消费者线程数", description = "获取当前核心消息消费者的线程数")
   public String getCoreConsumerThreadNum() {
       int threadNum = orderConsumerService.getCoreConsumerThreadNum();
       return "当前核心消费者线程数:" + threadNum;
   }
}

package com.jam.demo.consumer.service;

import com.alibaba.fastjson2.JSON;
import com.jam.demo.common.entity.CoreMessage;
import com.jam.demo.common.entity.Order;
import com.jam.demo.consumer.mapper.OrderMapper;
import com.jam.demo.consumer.config.RocketMQConfig;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.ObjectUtils;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 核心消息消费者服务(订单创建、支付回调)
* @author ken
*/

@Service
@Slf4j
@RequiredArgsConstructor
public class OrderConsumerService implements RocketMQListener<String> {

   private final OrderMapper orderMapper;
   private final PlatformTransactionManager transactionManager;

   @Resource
   private ThreadPoolTaskExecutor consumerThreadPool;

   /**
    * 核心消费者线程数(原子类保证线程安全)
    */

   private final AtomicInteger coreConsumerThreadNum = new AtomicInteger(10);

   /**
    * 初始化消费者线程池(默认10线程)
    */

   @PostConstruct
   public void initThreadPool() {
       consumerThreadPool.setCorePoolSize(coreConsumerThreadNum.get());
       consumerThreadPool.setMaxPoolSize(coreConsumerThreadNum.get() + 5);
       consumerThreadPool.setQueueCapacity(1000);
       log.info("核心消费者线程池初始化完成,默认线程数:{}", coreConsumerThreadNum.get());
   }

   /**
    * 消费核心消息(RocketMQ监听方法)
    * @param message 消息内容(JSON格式)
    */

   @Override
   @RocketMQMessageListener(
           topic = RocketMQConfig.CORE_ORDER_TOPIC,
           consumerGroup = RocketMQConfig.CORE_ORDER_CONSUMER_GROUP,
           // 广播模式:所有消费者都消费同一消息;集群模式:消息只被一个消费者消费(优先集群模式,避免重复消费)
           messageModel = org.apache.rocketmq.common.message.MessageModel.CLUSTERING,
           // 批量消费:每次拉取10条消息,提升消费速度(根据业务调整)
           consumeMessageBatchMaxSize = 10,
           // 消费超时时间:30秒(避免消费耗时过长导致消息重试)
           consumeTimeout = 30
   )
   public void onMessage(String message) {
       if (ObjectUtils.isEmpty(message)) {
           log.error("消费核心消息失败:消息内容为空");
           return;
       }
       // 提交到线程池异步消费,提升消费速度
       consumerThreadPool.execute(() -> processCoreMessage(message));
   }

   /**
    * 处理核心消息(业务逻辑+编程式事务)
    * @param message 消息内容(JSON格式)
    */

   private void processCoreMessage(String message) {
       // 编程式事务(保证消息消费与数据库操作的一致性)
       DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
       TransactionStatus transactionStatus = transactionManager.getTransaction(transactionDefinition);

       try {
           // 解析消息(FastJSON2)
           CoreMessage coreMessage = JSON.parseObject(message, CoreMessage.class);
           if (ObjectUtils.isEmpty(coreMessage) || ObjectUtils.isEmpty(coreMessage.getOrder())) {
               log.error("解析核心消息失败:消息格式错误,消息内容:{}", message);
               // 事务回滚
               transactionManager.rollback(transactionStatus);
               return;
           }

           Order order = coreMessage.getOrder();
           log.info("开始消费核心消息,订单ID:{},消息内容:{}", order.getOrderId(), message);

           // 业务逻辑:更新订单状态(模拟核心业务)
           LambdaUpdateWrapper<Order> updateWrapper = Wrappers.lambdaUpdate(Order.class)
                   .eq(Order::getOrderId, order.getOrderId())
                   .set(Order::getStatus, order.getStatus())
                   .set(Order::getUpdateTime, System.currentTimeMillis())
;

           int updateCount = orderMapper.update(null, updateWrapper);
           if (updateCount == 0) {
               log.error("消费核心消息失败:未找到对应订单,订单ID:{}", order.getOrderId());
               transactionManager.rollback(transactionStatus);
               return;
           }

           // 事务提交
           transactionManager.commit(transactionStatus);
           log.info("消费核心消息成功,订单ID:{}", order.getOrderId());
       } catch (Exception e) {
           log.error("消费核心消息异常,消息内容:{}", message, e);
           // 事务回滚
           transactionManager.rollback(transactionStatus);
           // 消息消费失败,抛出异常,触发RocketMQ重试(重试次数由队列配置决定)
           throw new RuntimeException("消费核心消息异常,触发重试", e);
       }
   }

   /**
    * 设置核心消费者线程数(动态调整)
    * @param threadNum 线程数
    */

   public void setCoreConsumerThreadNum(Integer threadNum) {
       coreConsumerThreadNum.set(threadNum);
       // 调整线程池参数
       consumerThreadPool.setCorePoolSize(threadNum);
       consumerThreadPool.setMaxPoolSize(threadNum + 5);
   }

   /**
    * 获取核心消费者线程数
    * @return 线程数
    */

   public int getCoreConsumerThreadNum() {
       return coreConsumerThreadNum.get();
   }
}

2.2.3 线程池配置(SpringBoot配置类)

package com.jam.demo.consumer.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
* 消费者线程池配置类
* @author ken
*/

@Configuration
public class ThreadPoolConfig {

   /**
    * 核心消息消费者线程池
    * @return 线程池实例
    */

   @Bean
   public ThreadPoolTaskExecutor consumerThreadPool() {
       ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
       // 线程名称前缀(便于日志排查)
       executor.setThreadNamePrefix("core-consumer-thread-");
       // 拒绝策略:丢弃任务并抛出异常(核心消息消费失败需及时感知)
       executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
       // 线程空闲时间:60秒
       executor.setKeepAliveSeconds(60);
       // 等待所有任务完成后再关闭线程池
       executor.setWaitForTasksToCompleteOnShutdown(true);
       // 等待时间:30秒(超时强制关闭)
       executor.setAwaitTerminationSeconds(30);
       return executor;
   }
}

2.2.4 关键说明

  1. 用ThreadPoolTaskExecutor创建消费者线程池,动态调整核心线程数,无需重启服务;
  2. 采用编程式事务(PlatformTransactionManager),保证消息消费与数据库操作的一致性,避免数据不一致;
  3. RocketMQ监听配置:集群模式(避免重复消费)、批量消费(每次拉取10条)、消费超时30秒,符合生产环境最佳实践;
  4. 线程池拒绝策略用AbortPolicy(丢弃任务并抛异常),核心消息消费失败需及时感知,避免无声失败;
  5. 扩容建议:如果队列分区数为20,可将消费者线程数调整为20(最大化利用分区),同时用K8s扩容2-3个消费者实例,进一步提升消费速度。

2.3 第三步:消息分流(避免核心消息被阻塞)

如果积压的消息中,有大量非核心消息(比如日志、通知),会阻塞核心消息的消费——此时需要将核心消息和非核心消息分流,让核心消息优先被消费,非核心消息后续慢慢处理。

2.3.1 分流的核心思路

  1. 创建临时队列:新建一个临时消息队列(比如CORE_ORDER_TOPIC_TEMP),用于接收核心消息;
  2. 消息转移:将原队列中的核心消息,批量转移到临时队列,让临时消费者组专门处理;
  3. 原队列清理:将原队列中的非核心消息暂停消费,或批量删除(如果无需处理),减少积压压力。

2.3.2 实例(RocketMQ消息批量转移)

基于RocketMQ的Admin API,实现消息批量转移,将原队列中的核心消息转移到临时队列,可直接运行:

package com.jam.demo.consumer.service;

import com.alibaba.fastjson2.JSON;
import com.jam.demo.common.entity.CoreMessage;
import com.jam.demo.consumer.config.RocketMQConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.admin.MQAdmin;
import org.apache.rocketmq.client.admin.MQAdminExt;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
import java.util.Set;

/**
* 消息分流服务(核心消息转移到临时队列)
* @author ken
*/

@Service
@Slf4j
@RequiredArgsConstructor
public class MessageShuntService {

   @Resource
   private MQAdminExt mqAdmin;

   private DefaultMQPullConsumer pullConsumer;
   private DefaultMQProducer pushProducer;

   /**
    * 初始化拉取消费者和推送生产者(用于消息转移)
    */

   @PostConstruct
   public void init() throws MQClientException {
       // 初始化拉取消费者(拉取原队列中的消息)
       pullConsumer = new DefaultMQPullConsumer("message_shunt_pull_consumer_group");
       pullConsumer.setNamesrvAddr(RocketMQConfig.NAMESRV_ADDR);
       pullConsumer.start();
       log.info("消息分流-拉取消费者初始化完成");

       // 初始化推送生产者(将核心消息推送到临时队列)
       pushProducer = new DefaultMQProducer("message_shunt_push_producer_group");
       pushProducer.setNamesrvAddr(RocketMQConfig.NAMESRV_ADDR);
       pushProducer.start();
       log.info("消息分流-推送生产者初始化完成");
   }

   /**
    * 消息分流:将原核心队列中的核心消息,转移到临时队列
    * @param batchSize 每次拉取的消息数量(批量处理,提升效率)
    * @return 转移结果(转移成功数量、失败数量)
    */

   public String shuntCoreMessage(int batchSize) {
       if (batchSize <= 0 || batchSize > 100) {
           log.error("消息分流失败:批量大小错误,必须为1-100之间的整数,当前参数:{}", batchSize);
           return "参数错误:批量大小必须为1-100之间的整数";
       }

       int successCount = 0;
       int failCount = 0;

       try {
           // 1. 获取原核心队列的所有消息队列(分区)
           Set<MessageQueue> messageQueues = pullConsumer.fetchSubscribeMessageQueues(RocketMQConfig.CORE_ORDER_TOPIC);
           if (CollectionUtils.isEmpty(messageQueues)) {
               log.error("消息分流失败:未找到原核心队列的分区");
               return "消息分流失败:未找到原核心队列的分区";
           }

           // 2. 遍历每个分区,拉取消息并转移
           for (MessageQueue messageQueue : messageQueues) {
               log.info("开始处理分区:{},队列名称:{}", messageQueue.getQueueId(), messageQueue.getTopic());

               // 获取分区的最小偏移量(从最开始拉取,避免遗漏)
               long offset = pullConsumer.minOffset(messageQueue);
               log.info("分区{}的最小偏移量:{}", messageQueue.getQueueId(), offset);

               while (true) {
                   // 批量拉取消息(每次拉取batchSize条)
                   PullResult pullResult = pullConsumer.pull(
                           messageQueue,
                           "*", // 订阅所有tag
                           offset,
                           batchSize
                   );

                   // 判断拉取状态
                   if (PullStatus.FOUND.equals(pullResult.getPullStatus())) {
                       List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                       if (!CollectionUtils.isEmpty(messageExtList)) {
                           for (MessageExt messageExt : messageExtList) {
                               try {
                                   // 解析消息,判断是否为核心消息(这里模拟核心消息包含order字段)
                                   String messageBody = new String(messageExt.getBody(), "UTF-8");
                                   CoreMessage coreMessage = JSON.parseObject(messageBody, CoreMessage.class);
                                   if (!ObjectUtils.isEmpty(coreMessage) && !ObjectUtils.isEmpty(coreMessage.getOrder())) {
                                       // 是核心消息,转移到临时队列
                                       Message tempMessage = new Message(
                                               RocketMQConfig.CORE_ORDER_TOPIC_TEMP, // 临时队列topic
                                               messageExt.getTags(),
                                               messageExt.getKeys(),
                                               messageBody.getBytes("UTF-8")
                                       );
                                       // 发送到临时队列
                                       SendResult sendResult = pushProducer.send(tempMessage, 3000);
                                       if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
                                           successCount++;
                                           log.info("消息转移成功,消息ID:{},分区:{}", sendResult.getMsgId(), messageQueue.getQueueId());
                                       } else {
                                           failCount++;
                                           log.error("消息转移失败,消息ID:{},发送状态:{}", messageExt.getMsgId(), sendResult.getSendStatus());
                                       }
                                   } else {
                                       // 非核心消息,跳过(可根据业务需求删除或暂存)
                                       log.warn("非核心消息,跳过转移,消息内容:{}", messageBody);
                                   }
                               } catch (Exception e) {
                                   failCount++;
                                   log.error("消息转移异常,消息ID:{}", messageExt.getMsgId(), e);
                               }
                           }
                       }
                       // 更新偏移量,继续拉取下一批
                       offset = pullResult.getNextBeginOffset();
                   } else if (PullStatus.NO_NEW_MSG.equals(pullResult.getPullStatus())) {
                       // 该分区没有新消息,退出循环
                       log.info("分区{}没有新消息,退出处理", messageQueue.getQueueId());
                       break;
                   } else if (PullStatus.OFFSET_ILLEGAL.equals(pullResult.getPullStatus())) {
                       // 偏移量非法,重置为最小偏移量
                       offset = pullConsumer.minOffset(messageQueue);
                       log.warn("分区{}偏移量非法,重置为最小偏移量:{}", messageQueue.getQueueId(), offset);
                   }
               }
           }

           return String.format("消息分流完成,转移成功:%d条,转移失败:%d条", successCount, failCount);
       } catch (Exception e) {
           log.error("消息分流整体异常", e);
           return String.format("消息分流异常,已转移成功:%d条,失败:%d条,异常信息:%s", successCount, failCount, e.getMessage());
       }
   }

   /**
    * 关闭拉取消费者和推送生产者(用于服务停止时释放资源)
    */

   public void close() {
       if (pullConsumer != null) {
           pullConsumer.shutdown();
           log.info("消息分流-拉取消费者已关闭");
       }
       if (pushProducer != null) {
           pushProducer.shutdown();
           log.info("消息分流-推送生产者已关闭");
       }
   }
}

2.3.3 临时队列消费者配置(专门消费转移后的核心消息)

package com.jam.demo.consumer.service;

import com.alibaba.fastjson2.JSON;
import com.jam.demo.common.entity.CoreMessage;
import com.jam.demo.common.entity.Order;
import com.jam.demo.consumer.mapper.OrderMapper;
import com.jam.demo.consumer.config.RocketMQConfig;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.ObjectUtils;

/**
* 临时队列消费者服务(消费转移后的核心消息)
* @author ken
*/

@Service
@Slf4j
@RequiredArgsConstructor
public class TempCoreMessageConsumer implements RocketMQListener<String> {

   private final OrderMapper orderMapper;
   private final PlatformTransactionManager transactionManager;

   /**
    * 消费临时队列中的核心消息
    * @param message 消息内容(JSON格式)
    */

   @Override
   @RocketMQMessageListener(
           topic = RocketMQConfig.CORE_ORDER_TOPIC_TEMP,
           consumerGroup = RocketMQConfig.CORE_ORDER_TEMP_CONSUMER_GROUP,
           messageModel = org.apache.rocketmq.common.message.MessageModel.CLUSTERING,
           consumeMessageBatchMaxSize = 15, // 临时队列,批量消费数量增加到15,加快消费
           consumeTimeout = 20
   )
   public void onMessage(String message) {
       if (ObjectUtils.isEmpty(message)) {
           log.error("消费临时队列消息失败:消息内容为空");
           return;
       }

       // 编程式事务
       DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
       TransactionStatus transactionStatus = transactionManager.getTransaction(transactionDefinition);

       try {
           CoreMessage coreMessage = JSON.parseObject(message, CoreMessage.class);
           if (ObjectUtils.isEmpty(coreMessage) || ObjectUtils.isEmpty(coreMessage.getOrder())) {
               log.error("解析临时队列消息失败:消息格式错误,消息内容:{}", message);
               transactionManager.rollback(transactionStatus);
               return;
           }

           Order order = coreMessage.getOrder();
           log.info("开始消费临时队列核心消息,订单ID:{}", order.getOrderId());

           // 业务逻辑:更新订单状态(与核心消费者一致)
           LambdaUpdateWrapper<Order> updateWrapper = Wrappers.lambdaUpdate(Order.class)
                   .eq(Order::getOrderId, order.getOrderId())
                   .set(Order::getStatus, order.getStatus())
                   .set(Order::getUpdateTime, System.currentTimeMillis())
;

           int updateCount = orderMapper.update(null, updateWrapper);
           if (updateCount == 0) {
               log.error("消费临时队列消息失败:未找到对应订单,订单ID:{}", order.getOrderId());
               transactionManager.rollback(transactionStatus);
               return;
           }

           transactionManager.commit(transactionStatus);
           log.info("消费临时队列核心消息成功,订单ID:{}", order.getOrderId());
       } catch (Exception e) {
           log.error("消费临时队列消息异常,消息内容:{}", message, e);
           transactionManager.rollback(transactionStatus);
           throw new RuntimeException("消费临时队列消息异常,触发重试", e);
       }
   }
}

2.3.4 关键说明

  1. 用MQAdminExt、DefaultMQPullConsumer拉取原队列消息,DefaultMQProducer将核心消息推送到临时队列,实现消息分流;
  2. 批量拉取消息(每次1-100条),提升分流效率,避免单条拉取耗时过长;
  3. 临时队列的批量消费数量设置为15(高于原队列的10),加快核心消息的消费速度;
  4. 非核心消息可根据业务需求,选择跳过、删除或暂存到其他队列,避免阻塞核心消息。

2.4 第四步:跳过无效消息(减少无效消费)

如果积压的消息中,有大量无效消息(比如重复消息、过期消息、格式错误消息),这些消息会占用消费资源,导致有效消息消费变慢——此时需要跳过无效消息,让消费者只处理有效消息。

2.4.1 无效消息的判断标准(根据业务定义)

  1. 过期消息:消息的创建时间超过业务允许的有效期(比如订单消息超过24小时,无需处理);
  2. 重复消息:消息ID重复(通过Redis或数据库记录已消费的消息ID,避免重复处理);
  3. 格式错误消息:消息JSON解析失败,或缺少核心字段(比如订单消息缺少订单ID)。

2.4.2 实例(跳过无效消息,结合Redis去重)

基于Redis(最新稳定版7.2.4),实现消息去重和无效消息过滤,可直接编译运行:

package com.jam.demo.consumer.service;

import com.alibaba.fastjson2.JSON;
import com.jam.demo.common.entity.CoreMessage;
import com.jam.demo.common.entity.Order;
import com.jam.demo.consumer.mapper.OrderMapper;
import com.jam.demo.consumer.config.RocketMQConfig;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.ObjectUtils;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

/**
* 核心消息消费者(带无效消息过滤+Redis去重)
* @author ken
*/

@Service
@Slf4j
@RequiredArgsConstructor
public class FilteredOrderConsumerService implements RocketMQListener<String> {

   private final OrderMapper orderMapper;
   private final PlatformTransactionManager transactionManager;

   @Resource
   private StringRedisTemplate stringRedisTemplate;

   /**
    * Redis key前缀(已消费消息ID)
    */

   private static final String CONSUMED_MESSAGE_KEY_PREFIX = "consumer:core:message:consumed:";

   /**
    * 消息有效期(24小时,单位:毫秒)
    */

   private static final long MESSAGE_VALID_TIME = 24 * 60 * 60 * 1000;

   /**
    * 消费核心消息(带过滤无效消息)
    * @param message 消息内容(JSON格式)
    */

   @Override
   @RocketMQMessageListener(
           topic = RocketMQConfig.CORE_ORDER_TOPIC,
           consumerGroup = RocketMQConfig.FILTERED_CORE_ORDER_CONSUMER_GROUP,
           messageModel = org.apache.rocketmq.common.message.MessageModel.CLUSTERING,
           consumeMessageBatchMaxSize = 12,
           consumeTimeout = 25
   )
   public void onMessage(String message) {
       if (ObjectUtils.isEmpty(message)) {
           log.error("消费核心消息失败:消息内容为空(无效消息)");
           return;
       }

       try {
           // 1. 解析消息基本信息
           CoreMessage coreMessage = JSON.parseObject(message, CoreMessage.class);
           if (ObjectUtils.isEmpty(coreMessage) || ObjectUtils.isEmpty(coreMessage.getOrder()) || ObjectUtils.isEmpty(coreMessage.getMsgId())) {
               log.error("消费核心消息失败:消息格式错误(无效消息),消息内容:{}", message);
               return;
           }

           String msgId = coreMessage.getMsgId();
           Order order = coreMessage.getOrder();
           long messageCreateTime = coreMessage.getCreateTime();

           // 2. 过滤无效消息
           // 2.1 过滤过期消息(创建时间超过24小时)
           if (System.currentTimeMillis() - messageCreateTime > MESSAGE_VALID_TIME) {
               log.warn("跳过过期消息,消息ID:{},订单ID:{},创建时间:{}", msgId, order.getOrderId(), messageCreateTime);
               return;
           }

           // 2.2 过滤重复消息(Redis去重,过期时间24小时)
           String redisKey = CONSUMED_MESSAGE_KEY_PREFIX + msgId;
           Boolean isConsumed = stringRedisTemplate.hasKey(redisKey);
           if (Boolean.TRUE.equals(isConsumed)) {
               log.warn("跳过重复消息,消息ID:{},订单ID:{}", msgId, order.getOrderId());
               return;
           }

           // 3. 有效消息,开始处理(编程式事务)
           DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
           TransactionStatus transactionStatus = transactionManager.getTransaction(transactionDefinition);

           try {
               log.info("开始消费有效核心消息,消息ID:{},订单ID:{}", msgId, order.getOrderId());

               // 业务逻辑:更新订单状态
               LambdaUpdateWrapper<Order> updateWrapper = Wrappers.lambdaUpdate(Order.class)
                       .eq(Order::getOrderId, order.getOrderId())
                       .set(Order::getStatus, order.getStatus())
                       .set(Order::getUpdateTime, System.currentTimeMillis())
;

               int updateCount = orderMapper.update(null, updateWrapper);
               if (updateCount == 0) {
                   log.error("消费有效核心消息失败:未找到对应订单,订单ID:{}", order.getOrderId());
                   transactionManager.rollback(transactionStatus);
                   return;
               }

               // 事务提交
               transactionManager.commit(transactionStatus);
               log.info("消费有效核心消息成功,订单ID:{}", order.getOrderId());

               // 记录已消费消息ID到Redis,设置24小时过期(避免重复消费)
               stringRedisTemplate.opsForValue().set(redisKey, "1", 24, TimeUnit.HOURS);
           } catch (Exception e) {
               log.error("消费有效核心消息异常,消息ID:{},订单ID:{}", msgId, order.getOrderId(), e);
               transactionManager.rollback(transactionStatus);
               throw new RuntimeException("消费有效核心消息异常,触发重试", e);
           }
       } catch (Exception e) {
           log.error("过滤/消费核心消息整体异常,消息内容:{}", message, e);
       }
   }
}

2.4.3 关键说明

  1. Redis去重逻辑:用消息唯一ID(msgId)作为Redis Key,值为固定标识,过期时间24小时(与消息有效期一致),避免永久占用Redis内存;
  2. 无效消息过滤优先级:先判断消息是否为空→再解析格式→再过滤过期→最后过滤重复,层层筛选,只处理有效消息;
  3. 异常隔离:过滤阶段的异常不触发消息重试(比如格式错误、过期),只有有效消息处理失败才触发重试,避免无效消息反复重试占用消费资源;
  4. Redis依赖配置(pom.xml):

<!-- Redis -->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Redis连接池(提升性能) -->
<dependency>
   <groupId>org.apache.commons</groupId>
   <artifactId>commons-pool2</artifactId>
</dependency>

2.5 紧急止血效果验证(必做,确认积压缓解)

紧急止血操作完成后,需通过3个维度验证效果,避免“假缓解”:

  1. 队列监控维度:查看消息队列的积压数量(Offset差值)是否持续下降,生产TPS ≤ 消费TPS;
  2. 消费端维度:查看消费者线程池利用率(80%左右为宜)、消费成功率(≥99%)、无大量重试日志;
  3. 业务维度:下游业务(如订单状态更新、库存扣减)恢复正常,无数据不一致问题。

三、根源排查:找到积压的“真凶”(避免重复踩坑)

紧急止血只是“治标”,只有找到积压的根本原因,才能“治本”。结合前文的排查流程图,从消费端、生产端、队列配置三个维度逐一排查,每个维度都有明确的排查方法和验证手段。

3.1 消费端问题排查(占比80%,最常见)

消费端是积压的最主要原因,核心排查方向:

3.1.1 消费服务是否存活

  • 排查方法:通过K8s/Docker查看消费者实例是否运行、端口是否存活、有无OOM/Kill日志;
  • 验证命令(Linux):

# 查看消费者进程
ps -ef | grep core-consumer
# 查看端口占用
netstat -tulpn | grep 8081
# 查看OOM日志
dmesg | grep -i oom | grep core-consumer

  • 解决方案:重启挂掉的实例、扩容实例、调整JVM参数(如-Xmx4g -Xms4g)避免OOM。

3.1.2 消费线程池是否阻塞

  • 排查方法:通过Arthas(阿里开源诊断工具)查看线程池状态(活跃线程数、队列长度、拒绝次数);
  • 验证命令(Arthas):

# 启动Arthas
java -jar arthas-boot.jar
# 选择消费者进程
# 查看线程池信息
thread -n 10 # 查看最繁忙的10个线程
# 查看线程池详细参数
ognl '@com.jam.demo.consumer.config.ThreadPoolConfig@consumerThreadPool.getCorePoolSize()'
ognl '@com.jam.demo.consumer.config.ThreadPoolConfig@consumerThreadPool.getActiveCount()'

  • 解决方案:动态扩容线程池、优化阻塞的业务逻辑(如慢SQL、远程调用超时)。

3.1.3 业务逻辑耗时过长

  • 排查方法:通过日志/链路追踪(SkyWalking)查看消费方法的耗时,定位慢操作(如慢SQL、第三方接口超时);
  • 慢SQL排查(MySQL8.0):

-- 开启慢查询日志
SET GLOBAL slow_query_log = ON;
SET GLOBAL long_query_time = 1; -- 耗时1秒以上的SQL记录
-- 查看慢查询日志
SELECT * FROM mysql.slow_log ORDER BY start_time DESC LIMIT 10;
-- 分析SQL执行计划
EXPLAIN UPDATE `order` SET status = 2, update_time = 1740000000000 WHERE order_id = '123456';

  • 解决方案:优化慢SQL(加索引、分库分表)、设置远程调用超时(如Feign设置1秒超时)、异步处理非核心业务。

3.2 生产端问题排查(占比15%)

生产端问题主要是“生产突增”或“重复生产”:

3.2.1 生产TPS突增

  • 排查方法:查看生产者监控(如Prometheus+Grafana),确认是否有大促、秒杀等峰值流量;
  • 解决方案:限流(如Sentinel)、削峰(如生产者端增加本地队列)、降级非核心生产业务。

3.2.2 重复生产

  • 排查方法:查看生产者日志,确认是否有“发送失败-重试”循环,或消息ID重复;
  • 解决方案:生产者端增加幂等性(如基于订单ID去重)、设置合理的重试次数(如3次)、避免无限重试。

3.3 队列配置问题排查(占比5%)

队列配置问题容易被忽略,但会导致“扩容无效”:

3.3.1 分区数不足

  • 排查方法:查看队列分区数(如RocketMQ):

# RocketMQ查看Topic分区数
sh mqadmin topicStatus -n 127.0.0.1:9876 -t CORE_ORDER_TOPIC

  • 解决方案:扩容分区数(注意:RocketMQ/Kafka分区数扩容后,需重新分配消费者)。

3.3.2 拉取策略不合理

  • 排查方法:查看消费者拉取配置(如批量拉取数、拉取间隔);
  • 解决方案:调整批量拉取数(如从10增加到20)、缩短拉取间隔(如从500ms缩短到200ms)。

四、彻底解决:从架构层面避免积压(治本)

找到根源后,从架构、配置、代码三个层面优化,彻底避免百万消息积压问题,核心思路是“提升消费能力、控制生产速度、增加容错机制”。

4.1 架构层面优化

4.1.1 消费者集群化+弹性伸缩

  • 方案:基于K8s实现消费者的弹性伸缩,根据积压数量自动扩容/缩容;
  • 配置示例(K8s HPA):

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
 name: core-consumer-hpa
spec:
 scaleTargetRef:
   apiVersion: apps/v1
   kind: Deployment
   name: core-consumer
 minReplicas: 2
 maxReplicas: 10
 metrics:
 - type: External
   external:
     metric:
       name: rocketmq_topic_backlog
       selector:
         matchLabels:
           topic: CORE_ORDER_TOPIC
     target:
       type: Value
       value: 10000 # 积压数超过1万,自动扩容

4.1.2 消息分级+优先级消费

  • 方案:将消息分为核心(订单、支付)、普通(物流)、低优先级(日志),不同优先级消息使用不同队列,核心队列配置更多分区和消费者;
  • 架构图:

4.2 配置层面优化

4.2.1 合理设置队列参数

参数 推荐值(RocketMQ) 说明
批量消费数 10-20 提升消费效率,避免单条消费
消费超时时间 20-30秒 避免消费耗时过长导致重试
重试次数 3次 避免无限重试增加积压
死信队列开启 失败消息进入死信,避免阻塞

4.2.2 消费者线程池配置

  • 核心线程数 = CPU核心数 * 2(如8核CPU,核心线程数16);
  • 最大线程数 = 核心线程数 + 5;
  • 队列容量 = 1000(避免队列过长导致阻塞);
  • 拒绝策略 = AbortPolicy(核心消息)/DiscardOldestPolicy(非核心消息)。

4.3 代码层面优化

4.3.1 消费业务异步化

将消费中的非核心业务(如日志、通知)异步处理,减少主流程耗时:

/**
* 异步处理非核心业务(日志记录)
* @author ken
*/

private void asyncProcessNonCoreBusiness(Order order) {
   // 异步线程池(单独配置,不占用核心消费线程)
   nonCoreThreadPool.execute(() -> {
       try {
           logService.recordOrderLog(order.getOrderId(), order.getStatus());
           notificationService.sendOrderNotify(order.getUserId(), order.getOrderId());
       } catch (Exception e) {
           log.error("异步处理非核心业务失败,订单ID:{}", order.getOrderId(), e);
       }
   });
}

4.3.2 消费幂等性保障

基于MySQL+Redis实现双重幂等,避免重复消费导致数据不一致:

/**
* 检查消费幂等性(MySQL+Redis)
* @param msgId 消息ID
* @param orderId 订单ID
* @return true-可消费,false-重复消费
* @author ken
*/

private boolean checkIdempotent(String msgId, String orderId) {
   // 1. Redis快速判断(第一层)
   String redisKey = CONSUMED_MESSAGE_KEY_PREFIX + msgId;
   if (Boolean.TRUE.equals(stringRedisTemplate.hasKey(redisKey))) {
       return false;
   }
   // 2. MySQL数据库判断(第二层,防Redis宕机)
   int count = orderMapper.countConsumedMsg(msgId, orderId);
   if (count > 0) {
       return stringRedisTemplate.opsForValue().set(redisKey, "1", 24, TimeUnit.HOURS); // 同步到Redis
   }
   return true;
}

对应的MyBatisPlus Mapper:

package com.jam.demo.consumer.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.common.entity.Order;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;

/**
* 订单Mapper
* @author ken
*/

@Repository
public interface OrderMapper extends BaseMapper<Order> {

   /**
    * 检查消息是否已消费(幂等性)
    * @param msgId 消息ID
    * @param orderId 订单ID
    * @return 已消费次数
    */

   int countConsumedMsg(@Param("msgId") String msgId, @Param("orderId") String orderId);
}

对应的Mapper XML(resources/mapper/OrderMapper.xml):

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.jam.demo.consumer.mapper.OrderMapper">
   <select id="countConsumedMsg" resultType="java.lang.Integer">
       SELECT COUNT(1) FROM `order_consume_record`
       WHERE msg_id = #{msgId} AND order_id = #{orderId}
   </select>
</mapper>

对应的MySQL表结构(MySQL8.0):

CREATE TABLE `order_consume_record` (
 `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
 `msg_id` varchar(64) NOT NULL COMMENT '消息唯一ID',
 `order_id` varchar(64) NOT NULL COMMENT '订单ID',
 `consume_time` bigint NOT NULL COMMENT '消费时间(毫秒)',
 `consumer_ip` varchar(32) NOT NULL COMMENT '消费者IP',
 PRIMARY KEY (`id`),
 UNIQUE KEY `uk_msg_order` (`msg_id`,`order_id`) COMMENT '消息ID+订单ID唯一索引(幂等性)',
 KEY `idx_order_id` (`order_id`) COMMENT '订单ID索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单消费记录(幂等性)';


五、复盘优化:建立预防机制

解决积压问题后,必须进行复盘,建立“监控-预警-应急”三位一体的预防机制,让积压问题“早发现、早处理、不扩大”。

5.1 完善监控体系

核心监控指标(需配置可视化面板,如Grafana):

维度 核心指标 预警阈值
队列 积压数量、生产TPS、消费TPS 积压>1万触发预警
消费端 消费成功率、消费耗时、线程池利用率 成功率<99%、耗时>2秒
生产端 生产成功率、重试次数 重试次数>100次/分钟
系统 JVM内存、CPU利用率、磁盘使用率 CPU>80%、内存>85%

5.2 配置分级预警

通过钉钉/企业微信配置分级预警,避免信息过载:

  • 一级预警(短信+电话):积压>10万、消费TPS=0、服务宕机;
  • 二级预警(钉钉群):积压>1万、消费成功率<99%、耗时>2秒;
  • 三级预警(日志):积压>5000、生产重试次数增加。

5.3 制定应急手册

将本文的紧急止血流程整理成标准化应急手册,包含:

  1. 应急联系人(开发、运维、DBA);
  2. 操作步骤(暂停非核心生产→扩容消费者→消息分流→跳过无效消息);
  3. 验证方法(监控指标、业务验证);
  4. 回滚方案(扩容后缩容、恢复非核心生产)。

总结

  1. 紧急止血核心:暂停非核心生产减少压力源,扩容消费者+消息分流提升消费能力,跳过无效消息减少资源浪费,30分钟内可快速缓解百万消息积压;
  2. 根源排查重点:80%的积压源于消费端(服务挂掉、线程池阻塞、慢业务),需通过Arthas、慢查询日志、队列监控逐一定位;
  3. 长期优化关键:架构上实现消费者弹性伸缩+消息分级消费,代码上保障幂等性+异步化,运营上建立监控-预警-应急体系,从“治标”到“治本”,彻底避免积压问题。
目录
相关文章
|
14天前
|
人工智能 自然语言处理 Shell
🦞 如何在 OpenClaw (Clawdbot/Moltbot) 配置阿里云百炼 API
本教程指导用户在开源AI助手Clawdbot中集成阿里云百炼API,涵盖安装Clawdbot、获取百炼API Key、配置环境变量与模型参数、验证调用等完整流程,支持Qwen3-max thinking (Qwen3-Max-2026-01-23)/Qwen - Plus等主流模型,助力本地化智能自动化。
28636 100
🦞 如何在 OpenClaw (Clawdbot/Moltbot) 配置阿里云百炼 API
|
4天前
|
应用服务中间件 API 网络安全
3分钟汉化OpenClaw,使用Docker快速部署启动OpenClaw(Clawdbot)教程
2026年全新推出的OpenClaw汉化版,是基于Claude API开发的智能对话系统本土化优化版本,解决了原版英文界面的使用壁垒,实现了界面、文档、指令的全中文适配。该版本采用Docker容器化部署方案,开箱即用,支持Linux、macOS、Windows全平台运行,适配个人、企业、生产等多种使用场景,同时具备灵活的配置选项和强大的扩展能力。本文将从项目简介、部署前准备、快速部署、详细配置、问题排查、监控维护等方面,提供完整的部署与使用指南,文中包含实操代码命令,确保不同技术水平的用户都能快速落地使用。
3207 0
|
10天前
|
人工智能 安全 机器人
OpenClaw(原 Clawdbot)钉钉对接保姆级教程 手把手教你打造自己的 AI 助手
OpenClaw(原Clawdbot)是一款开源本地AI助手,支持钉钉、飞书等多平台接入。本教程手把手指导Linux下部署与钉钉机器人对接,涵盖环境配置、模型选择(如Qwen)、权限设置及调试,助你快速打造私有、安全、高权限的专属AI助理。(239字)
5548 15
OpenClaw(原 Clawdbot)钉钉对接保姆级教程 手把手教你打造自己的 AI 助手
|
9天前
|
人工智能 机器人 Linux
OpenClaw(Clawdbot、Moltbot)汉化版部署教程指南(零门槛)
OpenClaw作为2026年GitHub上增长最快的开源项目之一,一周内Stars从7800飙升至12万+,其核心优势在于打破传统聊天机器人的局限,能真正执行读写文件、运行脚本、浏览器自动化等实操任务。但原版全英文界面对中文用户存在上手门槛,汉化版通过覆盖命令行(CLI)与网页控制台(Dashboard)核心模块,解决了语言障碍,同时保持与官方版本的实时同步,确保新功能最快1小时内可用。本文将详细拆解汉化版OpenClaw的搭建流程,涵盖本地安装、Docker部署、服务器远程访问等场景,同时提供环境适配、问题排查与国内应用集成方案,助力中文用户高效搭建专属AI助手。
4026 8
|
11天前
|
人工智能 机器人 Linux
保姆级 OpenClaw (原 Clawdbot)飞书对接教程 手把手教你搭建 AI 助手
OpenClaw(原Clawdbot)是一款开源本地AI智能体,支持飞书等多平台对接。本教程手把手教你Linux下部署,实现数据私有、系统控制、网页浏览与代码编写,全程保姆级操作,240字内搞定专属AI助手搭建!
5152 17
保姆级 OpenClaw (原 Clawdbot)飞书对接教程 手把手教你搭建 AI 助手
|
11天前
|
存储 人工智能 机器人
OpenClaw是什么?阿里云OpenClaw(原Clawdbot/Moltbot)一键部署官方教程参考
OpenClaw是什么?OpenClaw(原Clawdbot/Moltbot)是一款实用的个人AI助理,能够24小时响应指令并执行任务,如处理文件、查询信息、自动化协同等。阿里云推出的OpenClaw一键部署方案,简化了复杂配置流程,用户无需专业技术储备,即可快速在轻量应用服务器上启用该服务,打造专属AI助理。本文将详细拆解部署全流程、进阶功能配置及常见问题解决方案,确保不改变原意且无营销表述。
5605 5
|
13天前
|
人工智能 JavaScript 应用服务中间件
零门槛部署本地AI助手:Windows系统Moltbot(Clawdbot)保姆级教程
Moltbot(原Clawdbot)是一款功能全面的智能体AI助手,不仅能通过聊天互动响应需求,还具备“动手”和“跑腿”能力——“手”可读写本地文件、执行代码、操控命令行,“脚”能联网搜索、访问网页并分析内容,“大脑”则可接入Qwen、OpenAI等云端API,或利用本地GPU运行模型。本教程专为Windows系统用户打造,从环境搭建到问题排查,详细拆解全流程,即使无技术基础也能顺利部署本地AI助理。
7478 16
|
13天前
|
人工智能 JavaScript API
零门槛部署本地 AI 助手:Clawdbot/Meltbot 部署深度保姆级教程
Clawdbot(Moltbot)是一款智能体AI助手,具备“手”(读写文件、执行代码)、“脚”(联网搜索、分析网页)和“脑”(接入Qwen/OpenAI等API或本地GPU模型)。本指南详解Windows下从Node.js环境搭建、一键安装到Token配置的全流程,助你快速部署本地AI助理。(239字)
5103 22