1. 场景:电商系统为什么需要消息队列
2025年双十一,我负责的电商平台峰值QPS达到12万。下单成功后,系统需要同步通知库存扣减、物流创建、积分发放、短信推送等8个下游服务。
同步调用链路过长。 下单接口需要串行调用8个下游服务,任何一个超时或失败都会影响下单体验。最坏情况下,一次下单请求耗时超过5秒。
服务耦合严重。 新增一个下游服务(如风控审核)需要修改订单服务代码,重新部署上线。每次变更都有引入Bug的风险。
流量洪峰无法应对。 大促期间流量突增10倍,下游服务扛不住瞬时压力,频繁触发限流和熔断,导致大量订单消息丢失。
消息队列的三大核心价值正好解决这些问题:
| 核心价值 | 解决的问题 | 电商场景举例 |
|---|---|---|
| 解耦 | 下游服务变更不影响上游 | 新增积分服务无需改订单代码 |
| 异步 | 非核心逻辑异步处理 | 下单后异步通知8个下游 |
| 削峰 | 突发流量平滑消费 | 大促流量堆积后按消费能力处理 |
下面通过架构对比图,直观感受同步调用和异步消息的差异:

从图中可以看出,同步架构中订单服务直接依赖所有下游,任何下游故障都会拖垮订单。异步架构中,订单服务只需把消息发到 RocketMQ,下游各自消费,互不影响。
☁️ 2. RocketMQ 核心架构
2.1 四大核心组件
RocketMQ 由四大核心组件构成,各司其职:
NameServer: 轻量级注册中心,无状态设计,Broker 启动时向所有 NameServer 注册。每个 NameServer 独立维护路由信息,互不通信。
Broker: 消息存储和转发的核心节点,负责消息存储、索引和查询。Master 节点负责读写,Slave 节点负责备份和容灾切换。
Producer: 消息生产者,从 NameServer 获取路由信息后,直连 Broker 发送消息。支持发送重试和故障规避。
Consumer: 消息消费者,从 NameServer 获取路由信息后,直连 Broker 拉取消息。支持推模式和拉模式两种消费方式。
2.2 Topic 与 Queue 的关系
Topic 是消息主题,Queue 是 Topic 下的分区。一个 Topic 默认包含4个 Queue,分布在不同的 Broker 上。Queue 是消息读写的最小单位,也是顺序消息和负载均衡的基础。
关键设计: Queue 数量决定了最大消费者并行度。如果 Queue 数量为4,那么同一消费者组最多4个消费者实例能同时消费,多余的实例将处于空闲状态。
2.3 消息存储模型
RocketMQ 采用 CommitLog + ConsumeQueue 的存储模型,这是其高性能的核心设计:
CommitLog: 所有消息顺序写入同一个物理文件,追加写性能极高。单 Broker 写入 TPS 可达10万+。
ConsumeQueue: 每个 Queue 对应一个逻辑消费队列,存储消息在 CommitLog 中的偏移量。消费者先查 ConsumeQueue 定位偏移量,再从 CommitLog 读取完整消息。
IndexFile: 可选的索引文件,支持按 Key 或时间范围查询消息。
下面是 RocketMQ 架构全景图:

3. 自建 RocketMQ 集群部署
3.1 部署模式选择
| 模式 | 架构 | 适用场景 | 可用性 |
|---|---|---|---|
| 单主模式 | 1 Master | 本地开发测试 | 无容灾 |
| 主从模式 | 1 Master + 1 Slave | 小规模生产 | 手动切换 |
| 多主模式 | 2+ Master | 中等规模生产 | 无单点但无副本 |
| 多主多从 | 2+ Master + 2+ Slave | 大规模生产 | 自动切换 |
生产环境推荐多主多从 + Dledger 模式,实现主从自动切换。
3.2 Docker Compose 部署集群
使用 Docker Compose 可以快速搭建2主2从的 RocketMQ 集群,适合开发测试和预生产环境验证。
# docker-compose-rocketmq.yml - 2主2从 Dledger 模式
version: '3.8'
services:
namesrv1:
image: apache/rocketmq:5.3.1
container_name: namesrv1
ports:
- "9876:9876"
command: sh mqnamesrv
networks:
- rocketmq
namesrv2:
image: apache/rocketmq:5.3.1
container_name: namesrv2
ports:
- "9877:9876"
command: sh mqnamesrv
networks:
- rocketmq
broker-a-master:
image: apache/rocketmq:5.3.1
container_name: broker-a-master
ports:
- "10911:10911"
- "10912:10912"
environment:
- NAMESRV_ADDR=namesrv1:9876;namesrv2:9876
volumes:
- ./broker-a-master.conf:/home/rocketmq/conf/broker.conf
command: sh mqbroker -c /home/rocketmq/conf/broker.conf
depends_on:
- namesrv1
- namesrv2
networks:
- rocketmq
broker-a-slave:
image: apache/rocketmq:5.3.1
container_name: broker-a-slave
ports:
- "10921:10911"
- "10922:10912"
environment:
- NAMESRV_ADDR=namesrv1:9876;namesrv2:9876
volumes:
- ./broker-a-slave.conf:/home/rocketmq/conf/broker.conf
command: sh mqbroker -c /home/rocketmq/conf/broker.conf
depends_on:
- namesrv1
- namesrv2
networks:
- rocketmq
broker-b-master:
image: apache/rocketmq:5.3.1
container_name: broker-b-master
ports:
- "10931:10911"
- "10932:10912"
environment:
- NAMESRV_ADDR=namesrv1:9876;namesrv2:9876
volumes:
- ./broker-b-master.conf:/home/rocketmq/conf/broker.conf
command: sh mqbroker -c /home/rocketmq/conf/broker.conf
depends_on:
- namesrv1
- namesrv2
networks:
- rocketmq
broker-b-slave:
image: apache/rocketmq:5.3.1
container_name: broker-b-slave
ports:
- "10941:10911"
- "10942:10912"
environment:
- NAMESRV_ADDR=namesrv1:9876;namesrv2:9876
volumes:
- ./broker-b-slave.conf:/home/rocketmq/conf/broker.conf
command: sh mqbroker -c /home/rocketmq/conf/broker.conf
depends_on:
- namesrv1
- namesrv2
networks:
- rocketmq
dashboard:
image: apacherocketmq/rocketmq-dashboard:latest
container_name: rocketmq-dashboard
ports:
- "8080:8080"
environment:
- JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv1:9876;namesrv2:9876
depends_on:
- namesrv1
- namesrv2
networks:
- rocketmq
networks:
rocketmq:
driver: bridge
3.3 Broker 配置文件
Dledger 模式的 Broker 配置需要开启
enableDLegerGroup,确保主从自动切换。以下是 Broker-A Master 的配置。
# broker-a-master.conf - Broker-A Master 节点配置
# 基础配置
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 网络配置
brokerIP1 = broker-a-master
listenPort = 10911
namesrvAddr = namesrv1:9876;namesrv2:9876
# 存储配置
storePathRootDir = /home/rocketmq/store
storePathCommitLog = /home/rocketmq/store/commitlog
autoCreateTopicEnable = true
defaultTopicQueueNums = 8
# Dledger 配置(主从自动切换)
enableDLegerGroup = true
dLegerGroup = broker-a
dLegerPeers = n0@broker-a-master:10912;n1@broker-a-slave:10912
dLegerSelfId = n0
# 性能调优
sendMessageThreadPoolNums = 16
pullMessageThreadPoolNums = 16
useReentrantLockWhenPutMessage = true
Broker-A Slave 的配置需要修改
dLegerSelfId和brokerIP1。
# broker-a-slave.conf - Broker-A Slave 节点配置
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
brokerIP1 = broker-a-slave
listenPort = 10911
namesrvAddr = namesrv1:9876;namesrv2:9876
storePathRootDir = /home/rocketmq/store
storePathCommitLog = /home/rocketmq/store/commitlog
autoCreateTopicEnable = true
defaultTopicQueueNums = 8
# Dledger 配置
enableDLegerGroup = true
dLegerGroup = broker-a
dLegerPeers = n0@broker-a-master:10912;n1@broker-a-slave:10912
dLegerSelfId = n1
sendMessageThreadPoolNums = 16
pullMessageThreadPoolNums = 16
useReentrantLockWhenPutMessage = true
启动集群:
先启动 NameServer,再启动 Broker,确保集群正常注册。
# 启动 RocketMQ 集群
docker-compose -f docker-compose-rocketmq.yml up -d
# 检查集群状态
docker exec -it namesrv1 sh -c \
"cd /home/rocketmq && sh mqadmin clusterList -n localhost:9876"
# 查看 Topic 列表
docker exec -it namesrv1 sh -c \
"cd /home/rocketmq && sh mqadmin topicList -n localhost:9876"
4. Spring Boot 集成 RocketMQ
4.1 依赖与配置
使用
rocketmq-spring-boot-starter可以像使用 Spring 消息抽象一样操作 RocketMQ,大幅降低接入成本。
<!-- pom.xml - RocketMQ Spring Boot Starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.1</version>
</dependency>
生产环境必须配置发送重试和超时时间,避免网络抖动导致消息丢失。
# application.yml - RocketMQ 配置
rocketmq:
name-server: 192.168.1.100:9876;192.168.1.101:9876
producer:
group: order-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 3
retry-times-when-send-async-failed: 3
max-message-size: 4194304
enable-msg-trace: true
customize-trace-topic: RMQ_SYS_TRACE_TOPIC
4.2 普通消息发送与消费
普通消息是最基础的消息类型,适用于大多数异步通知场景。以下代码演示订单消息的发送和消费。
// OrderMessageProducer.java - 订单消息生产者
@Component
@RequiredArgsConstructor
public class OrderMessageProducer {
private final RocketMQTemplate rocketMQTemplate;
/**
* 发送同步普通消息 - 适用于重要业务通知
* 确保消息发送成功后才返回
*/
public SendResult sendOrderMessage(OrderDTO order) {
Message<OrderDTO> message = MessageBuilder
.withPayload(order)
.setHeader("KEYS", order.getOrderId())
.setHeader("TAGS", "orderCreated")
.build();
SendResult result = rocketMQTemplate.syncSend(
"order-topic:orderCreated",
message,
3000 // 超时时间 3s
);
log.info("订单消息发送成功: orderId={}, msgId={}, brokerName={}",
order.getOrderId(),
result.getMsgId(),
result.getMessageQueue().getBrokerName());
return result;
}
/**
* 发送异步普通消息 - 适用于对实时性要求不高的场景
* 发送后立即返回,通过回调获取发送结果
*/
public void sendOrderMessageAsync(OrderDTO order) {
Message<OrderDTO> message = MessageBuilder
.withPayload(order)
.setHeader("KEYS", order.getOrderId())
.build();
rocketMQTemplate.asyncSend(
"order-topic:orderCreated",
message,
new SendCallback() {
@Override
public void onSuccess(SendResult result) {
log.info("异步发送成功: orderId={}, msgId={}",
order.getOrderId(), result.getMsgId());
}
@Override
public void onException(Throwable e) {
log.error("异步发送失败: orderId={}",
order.getOrderId(), e);
// 记录到本地表,后续补偿重发
}
},
3000
);
}
}
消费者使用
@RocketMQMessageListener注解声明式消费,Spring 自动管理消费线程和负载均衡。
// OrderMessageConsumer.java - 订单消息消费者
@Component
@RocketMQMessageListener(
topic = "order-topic",
selectorExpression = "orderCreated",
consumerGroup = "order-consumer-group",
consumeMode = ConsumeMode.CONCURRENTLY,
maxReconsumeTimes = 5
)
@RequiredArgsConstructor
public class OrderMessageConsumer implements RocketMQListener<OrderDTO> {
private final InventoryService inventoryService;
private final LogisticsService logisticsService;
@Override
public void onMessage(OrderDTO order) {
log.info("收到订单消息: orderId={}", order.getOrderId());
try {
// 扣减库存
inventoryService.deduct(order.getOrderId(), order.getSkuList());
// 创建物流单
logisticsService.createOrder(order.getOrderId(),
order.getShippingAddress());
} catch (Exception e) {
log.error("订单处理失败: orderId={}", order.getOrderId(), e);
// 抛出异常触发重试
throw new RuntimeException("订单处理失败,触发重试", e);
}
}
}
5. 四种消息类型实战
5.1 顺序消息:订单状态变更
订单状态必须按 创建 → 支付 → 发货 → 签收 的顺序处理,否则可能出现"已签收"先于"已支付"的异常状态。顺序消息通过将同一订单的消息路由到同一 Queue 实现。
// OrderStatusProducer.java - 顺序消息生产者
@Component
@RequiredArgsConstructor
public class OrderStatusProducer {
private final RocketMQTemplate rocketMQTemplate;
/**
* 发送顺序消息 - 同一订单的状态变更消息路由到同一 Queue
* 使用 orderId 作为 hash key,保证同一订单消息有序
*/
public SendResult sendOrderStatusMessage(OrderStatusDTO statusDTO) {
Message<OrderStatusDTO> message = MessageBuilder
.withPayload(statusDTO)
.setHeader("KEYS", statusDTO.getOrderId())
.build();
// hashKey 保证同一 orderId 的消息进入同一 Queue
SendResult result = rocketMQTemplate.syncSendOrderly(
"order-status-topic",
message,
statusDTO.getOrderId() // hashKey
);
log.info("顺序消息发送成功: orderId={}, status={}, queueId={}",
statusDTO.getOrderId(),
statusDTO.getStatus(),
result.getMessageQueue().getQueueId());
return result;
}
}
// OrderStatusConsumer.java - 顺序消息消费者
@Component
@RocketMQMessageListener(
topic = "order-status-topic",
consumerGroup = "order-status-consumer-group",
consumeMode = ConsumeMode.ORDERLY // 顺序消费模式
)
public class OrderStatusConsumer implements RocketMQListener<OrderStatusDTO> {
private final OrderService orderService;
@Override
public void onMessage(OrderStatusDTO statusDTO) {
log.info("顺序消费订单状态: orderId={}, status={}",
statusDTO.getOrderId(), statusDTO.getStatus());
orderService.updateStatus(statusDTO.getOrderId(),
statusDTO.getStatus());
}
}
5.2 事务消息:下单扣库存的最终一致性
下单和扣库存分属不同服务,本地事务无法保证跨服务一致性。事务消息通过半消息 + 本地事务 + 回查机制,实现最终一致性,无需引入 Seata 等分布式事务框架。

// OrderTransactionProducer.java - 事务消息生产者
@Component
@RequiredArgsConstructor
public class OrderTransactionProducer {
private final RocketMQTemplate rocketMQTemplate;
/**
* 发送事务消息
* 1. 先发送半消息(对消费者不可见)
* 2. 执行本地事务(创建订单)
* 3. 根据本地事务结果提交或回滚半消息
*/
public void sendTransactionMessage(OrderDTO order) {
Message<OrderDTO> message = MessageBuilder
.withPayload(order)
.setHeader("KEYS", order.getOrderId())
.build();
// 事务消息发送,第三个参数传给本地事务监听器
rocketMQTemplate.sendMessageInTransaction(
"order-tx-topic",
message,
order // 传递给本地事务监听器的参数
);
log.info("事务消息发送完成: orderId={}", order.getOrderId());
}
}
// OrderTransactionListener.java - 事务消息监听器
@Component
@RequiredArgsConstructor
@RocketMQTransactionListener
public class OrderTransactionListener
implements RocketMQLocalTransactionListener {
private final OrderService orderService;
private final OrderQueryService orderQueryService;
/**
* 执行本地事务 - 创建订单
* 半消息发送成功后自动回调此方法
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(
Message msg, Object arg) {
OrderDTO order = (OrderDTO) arg;
try {
// 执行本地事务:创建订单
orderService.createOrder(order);
log.info("本地事务成功,提交半消息: orderId={}",
order.getOrderId());
return RocketMQLocalTransactionState.COMMIT;
} catch (DuplicateKeyException e) {
// 订单已存在,直接提交
log.warn("订单已存在,提交半消息: orderId={}",
order.getOrderId());
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("本地事务失败,回滚半消息: orderId={}",
order.getOrderId(), e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 事务回查 - Broker 未收到确认时回调
* 生产环境必须实现回查逻辑,否则超时后消息会被删除
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(
Message msg) {
String orderId = (String) msg.getHeaders().get("KEYS");
// 查询本地事务状态
boolean exists = orderQueryService.existsById(orderId);
if (exists) {
log.info("事务回查:订单存在,提交: orderId={}", orderId);
return RocketMQLocalTransactionState.COMMIT;
}
log.warn("事务回查:订单不存在,回滚: orderId={}", orderId);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
5.3 延迟消息:超时未支付自动取消
用户下单后30分钟未支付需要自动取消订单并释放库存。如果用定时任务轮询,数据库压力大且实时性差。延迟消息到时间后自动投递,精准高效。
RocketMQ 5.x 开源版支持 18 个延迟等级(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h),其中 30 分钟对应等级16。
// OrderTimeoutProducer.java - 延迟消息生产者
@Component
@RequiredArgsConstructor
public class OrderTimeoutProducer {
private final RocketMQTemplate rocketMQTemplate;
/**
* 发送延迟消息 - 下单后30分钟检查支付状态
* delayLevel=16 对应30分钟延迟
*/
public SendResult sendTimeoutCheckMessage(String orderId) {
Message<String> message = MessageBuilder
.withPayload(orderId)
.setHeader("KEYS", orderId)
.build();
SendResult result = rocketMQTemplate.syncSend(
"order-timeout-topic",
message,
3000,
16 // delayLevel=16 → 30分钟
);
log.info("延迟消息发送成功: orderId={}, 将在30分钟后投递",
orderId);
return result;
}
}
// OrderTimeoutConsumer.java - 延迟消息消费者
@Component
@RocketMQMessageListener(
topic = "order-timeout-topic",
consumerGroup = "order-timeout-consumer-group",
consumeMode = ConsumeMode.CONCURRENTLY
)
@RequiredArgsConstructor
public class OrderTimeoutConsumer implements RocketMQListener<String> {
private final OrderService orderService;
@Override
public void onMessage(String orderId) {
log.info("收到超时检查消息: orderId={}", orderId);
Order order = orderService.getById(orderId);
if (order != null && order.getStatus() == OrderStatus.CREATED) {
// 订单仍为待支付状态,执行取消
orderService.cancelOrder(orderId);
log.info("订单超时未支付,已自动取消: orderId={}", orderId);
} else {
log.info("订单已支付或已取消,跳过: orderId={}", orderId);
}
}
}
5.4 批量消息:批量导入
批量导入商品时,逐条发送消息效率太低。批量消息将多条消息打包一次发送,减少网络开销,提升吞吐量。
// BatchMessageProducer.java - 批量消息生产者
@Component
@RequiredArgsConstructor
public class BatchMessageProducer {
private final RocketMQTemplate rocketMQTemplate;
/**
* 批量发送消息 - 适用于批量导入、批量通知等场景
* 注意:批量消息的 Topic 必须相同,且不支持延迟消息
* 单批次大小不能超过 4MB(默认 maxMessageSize)
*/
public void sendBatchMessages(List<ProductDTO> products) {
List<Message<MessagePayload>> messages = products.stream()
.map(p -> {
MessagePayload payload = new MessagePayload();
payload.setProductId(p.getProductId());
payload.setAction("IMPORT");
return MessageBuilder
.withPayload(payload)
.setHeader("KEYS", p.getProductId())
.build();
})
.collect(Collectors.toList());
// 分片发送,每批不超过 4MB
List<List<Message<MessagePayload>>> batches =
splitBatch(messages, 500); // 每批最多500条
for (List<Message<MessagePayload>> batch : batches) {
rocketMQTemplate.syncSend(
"product-import-topic",
batch
);
log.info("批量消息发送成功: batchSize={}", batch.size());
}
}
/**
* 分片工具方法 - 避免单批次超过大小限制
*/
private <T> List<List<T>> splitBatch(List<T> list, int batchSize) {
List<List<T>> result = new ArrayList<>();
for (int i = 0; i < list.size(); i += batchSize) {
result.add(list.subList(i,
Math.min(i + batchSize, list.size())));
}
return result;
}
}
☁️ 6. 阿里云消息队列 RocketMQ 版
6.1 托管版 vs 自建版功能对比
| 对比维度 | 自建 RocketMQ | 阿里云消息队列 RocketMQ 版 |
|---|---|---|
| 部署运维 | 需自建集群、监控、告警 | 全托管,免运维 |
| 高可用 | 需手动配置主从切换 | 自动主从切换,SLA 99.95% |
| 消息轨迹 | 需自建 Trace 系统 | 内置消息轨迹查询 |
| 定时消息 | 仅支持18个延迟等级 | 支持任意时间精度的定时消息 |
| 消息过滤 | Tag 过滤 | Tag + SQL92 过滤 |
| HTTP 接入 | 不支持 | 支持 HTTP 协议接入 |
| 监控告警 | 需自建 Prometheus + Grafana | 内置监控大盘和告警 |
| 安全 | 需自建 ACL | 内置 ACL + STS 临时凭证 |
| 版本升级 | 手动滚动升级 | 一键升级,零停机 |
| 灾备 | 需自建跨机房方案 | 支持同城双活和异地灾备 |

6.2 阿里云增强功能
消息轨迹: 可视化查看消息从发送到消费的完整链路,包括发送时间、存储 Broker、消费时间、消费结果。排查消息丢失问题从小时级降到分钟级。
定时消息: 开源版只支持18个固定延迟等级,阿里云版支持指定任意时间点投递,精度到毫秒级。这对"下单后30分15秒自动取消"这种精确场景非常关键。
SQL 过滤: 开源版只支持 Tag 过滤,阿里云版支持 SQL92 语法过滤。例如 a > 5 AND b = 'hello',在 Broker 端过滤,减少网络传输。
HTTP 接入: 非Java应用(如 Python、Node.js)可以通过 HTTP 协议接入,无需安装 SDK,降低接入门槛。
6.3 阿里云 RocketMQ 接入配置
阿里云 RocketMQ 版使用 AccessKey 鉴权,需要配置 AK/SK 和实例信息。建议使用 STS 临时凭证,避免 AK 泄露风险。
# application-aliyun.yml - 阿里云 RocketMQ 配置
rocketmq:
name-server: xxx.mq-internet-access.mq-internet.aliyuncs.com:8080
producer:
group: GID_ORDER_PRODUCER
send-message-timeout: 3000
retry-times-when-send-failed: 3
access-key: ${
ALIYUN_ACCESS_KEY}
secret-key: ${
ALIYUN_SECRET_KEY}
enable-msg-trace: true
customize-trace-topic: ORDER_TRACE_TOPIC
consumer:
access-key: ${
ALIYUN_ACCESS_KEY}
secret-key: ${
ALIYUN_SECRET_KEY}
阿里云 RocketMQ 版的 Topic 需要在控制台预先创建,不支持自动创建。这是生产环境的最佳实践,避免误创建 Topic。
// AliyunOrderProducer.java - 阿里云 RocketMQ 生产者
@Component
@RequiredArgsConstructor
public class AliyunOrderProducer {
private final RocketMQTemplate rocketMQTemplate;
/**
* 发送消息到阿里云 RocketMQ
* Topic 和 Group ID 必须在阿里云控制台预先创建
* 命名规范:Topic 以 T_ 开头,Group 以 GID_ 开头
*/
public SendResult sendToAliyun(OrderDTO order) {
Message<OrderDTO> message = MessageBuilder
.withPayload(order)
.setHeader("KEYS", order.getOrderId())
.setHeader("TAGS", "orderCreated")
.build();
// 阿里云 Topic 格式:T_ORDER
SendResult result = rocketMQTemplate.syncSend(
"T_ORDER:orderCreated",
message,
3000
);
log.info("阿里云消息发送成功: orderId={}, msgId={}",
order.getOrderId(), result.getMsgId());
return result;
}
}
6.4 成本对比
以日均消息量 1000 万条、峰值 TPS 5000 的中等规模业务为例:
| 成本项 | 自建 RocketMQ | 阿里云 RocketMQ 版 |
|---|---|---|
| 服务器 | 4台 ECS (8C16G) ≈ ¥3,200/月 | - |
| 运维人力 | 0.5人 ≈ ¥8,000/月 | 0 |
| 监控系统 | Prometheus + Grafana ≈ ¥500/月 | 内置 |
| 消息轨迹 | 自建 Trace ≈ ¥1,000/月 | 内置 |
| 云服务费 | - | 铂金版 ≈ ¥3,800/月 |
| 月度总成本 | ≈ ¥12,700 | ≈ ¥3,800 |
阿里云托管版在中等规模场景下,综合成本约为自建方案的30%,且省去了运维人力投入。
7. 高可用架构设计
7.1 生产环境 Broker 部署方案
生产环境推荐2主2从 + Dledger 模式,部署在阿里云 ECS 上:
| 节点 | 规格 | 部署组件 | 磁盘 |
|---|---|---|---|
| 节点1 | ECS 8C16G | NameServer + Broker-A Master | 500GB SSD |
| 节点2 | ECS 8C16G | NameServer + Broker-B Master | 500GB SSD |
| 节点3 | ECS 8C16G | NameServer + Broker-A Slave | 500GB SSD |
| 节点4 | ECS 8C16G | NameServer + Broker-B Slave | 500GB SSD |

关键设计: Master 和 Slave 交叉部署在不同节点上。节点1挂了,Broker-A Slave 在节点3自动升主;节点2挂了,Broker-B Slave 在节点4自动升主。单节点故障不影响任何 Topic 的读写。
7.2 消息可靠性保障三道防线
第一道:发送端确认。 使用同步发送 + 重试机制,确保消息到达 Broker。发送失败的消息记录到本地表,定时任务补偿重发。
第二道:消费端重试。 消费失败的消息自动重试,默认重试16次(1s/5s/10s/30s...2h)。重试间隔逐步增大,避免雪崩。
第三道:死信队列。 超过最大重试次数的消息进入死信队列 %DLQ%ConsumerGroup,人工介入处理,确保消息不丢失。
生产环境必须实现发送端本地表 + 消费端幂等 + 死信队列监控,三道防线缺一不可。
// ReliableMessageProducer.java - 可靠消息发送
@Component
@RequiredArgsConstructor
public class ReliableMessageProducer {
private final RocketMQTemplate rocketMQTemplate;
private final MessageRecordMapper messageRecordMapper;
/**
* 可靠消息发送 - 先写本地表,再发消息
* 保证消息至少被发送一次(At Least Once)
*/
@Transactional
public void sendReliably(String topic, Object payload, String key) {
// 1. 先将消息写入本地表(状态=待发送)
MessageRecord record = new MessageRecord();
record.setMessageId(IdUtil.fastSimpleUUID());
record.setTopic(topic);
record.setPayload(JsonUtils.toJson(payload));
record.setMessageKey(key);
record.setStatus(MessageStatus.PENDING);
record.setRetryCount(0);
record.setCreateTime(LocalDateTime.now());
messageRecordMapper.insert(record);
// 2. 发送消息
try {
SendResult result = rocketMQTemplate.syncSend(
topic,
MessageBuilder.withPayload(payload)
.setHeader("KEYS", key)
.build()
);
// 3. 发送成功,更新状态
record.setStatus(MessageStatus.SENT);
record.setMsgId(result.getMsgId());
messageRecordMapper.updateById(record);
} catch (Exception e) {
log.error("消息发送失败,等待补偿: messageId={}",
record.getMessageId(), e);
// 发送失败不回滚本地事务,定时任务会补偿重发
}
}
}
// MessageCompensationJob.java - 消息补偿定时任务
@Component
@RequiredArgsConstructor
public class MessageCompensationJob {
private final MessageRecordMapper messageRecordMapper;
private final RocketMQTemplate rocketMQTemplate;
/**
* 每分钟扫描待发送和发送失败的消息,补偿重发
* 超过最大重试次数的标记为发送失败,人工处理
*/
@Scheduled(fixedRate = 60000)
public void compensate() {
List<MessageRecord> pendingMessages =
messageRecordMapper.selectPendingMessages(10);
for (MessageRecord record : pendingMessages) {
if (record.getRetryCount() >= 5) {
record.setStatus(MessageStatus.FAILED);
messageRecordMapper.updateById(record);
log.error("消息补偿超过最大重试次数: messageId={}",
record.getMessageId());
continue;
}
try {
SendResult result = rocketMQTemplate.syncSend(
record.getTopic(),
MessageBuilder.withPayload(record.getPayload())
.setHeader("KEYS", record.getMessageKey())
.build()
);
record.setStatus(MessageStatus.SENT);
record.setMsgId(result.getMsgId());
record.setRetryCount(record.getRetryCount() + 1);
} catch (Exception e) {
record.setRetryCount(record.getRetryCount() + 1);
log.warn("消息补偿重发失败: messageId={}, retryCount={}",
record.getMessageId(), record.getRetryCount());
}
messageRecordMapper.updateById(record);
}
}
}
7.3 消息堆积处理策略
消息堆积是生产环境最常见的故障之一,常见原因和处理方案:
| 堆积原因 | 识别方式 | 处理方案 |
|---|---|---|
| 消费端 Bug | 消费 TPS 下降 | 修复 Bug 后重启消费者 |
| 消费端慢查询 | 消费 RT 升高 | 优化数据库查询,加缓存 |
| Queue 数量不足 | 消费者空闲 | 扩容 Queue 数量 |
| 流量突增 | 发送 TPS 飙升 | 临时扩容消费者实例 |
紧急处理流程:
- 查看消费 TPS 和堆积量,确认堆积原因
- 如果是消费端问题,修复后重启消费者
- 如果是 Queue 不足,扩容 Queue 并重启消费者
- 如果堆积量过大,临时增加消费者实例并行消费
- 堆积消除后,缩容消费者实例到正常水平
7.4 监控告警方案
自建集群需要完整的监控体系,以下是 Prometheus + Grafana 的关键监控指标。
# rocketmq-alert-rules.yml - Prometheus 告警规则
groups:
- name: rocketmq-alerts
rules:
# 消息堆积告警
- alert: RocketMQConsumerLag
expr: rocketmq_consumer_lag > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "消费者堆积超过1万条"
description: "消费者组 {
{ $labels.group }} 在 Topic {
{ $labels.topic }} 堆积 {
{ $value }} 条消息"
# Broker 磁盘使用率告警
- alert: RocketMQBrokerDiskUsage
expr: rocketmq_broker_disk_usage_ratio > 0.85
for: 10m
labels:
severity: critical
annotations:
summary: "Broker 磁盘使用率超过85%"
description: "Broker {
{ $labels.broker }} 磁盘使用率 {
{ $value | humanizePercentage }}"
# 发送失败率告警
- alert: RocketMQSendFailRate
expr: rate(rocketmq_producer_send_failed_total[5m]) / rate(rocketmq_producer_send_total[5m]) > 0.01
for: 3m
labels:
severity: critical
annotations:
summary: "消息发送失败率超过1%"
description: "Topic {
{ $labels.topic }} 发送失败率 {
{ $value | humanizePercentage }}"
8. 避坑指南
坑1:消息重复消费的幂等性处理
现象: 消费者收到重复消息,导致库存扣了两次、积分发了两次。
原因: RocketMQ 保证 At Least Once 语义,网络抖动或消费者重启时可能重复投递。
解决方案: 消费端必须实现幂等性,推荐数据库唯一索引方案:
// IdempotentConsumer.java - 幂等消费
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group"
)
@RequiredArgsConstructor
public class IdempotentConsumer implements RocketMQListener<OrderDTO> {
private final ConsumeRecordMapper consumeRecordMapper;
private final InventoryService inventoryService;
@Override
public void onMessage(OrderDTO order) {
String consumeId = order.getOrderId() + "_DEDUCT";
// 幂等检查:利用数据库唯一索引
try {
ConsumeRecord record = new ConsumeRecord();
record.setConsumeId(consumeId);
record.setTopic("order-topic");
record.setCreateTime(LocalDateTime.now());
consumeRecordMapper.insert(record);
} catch (DuplicateKeyException e) {
log.info("重复消费,跳过: orderId={}", order.getOrderId());
return; // 已消费过,直接返回
}
// 执行业务逻辑
inventoryService.deduct(order.getOrderId(), order.getSkuList());
}
}
坑2:顺序消息的 Queue 选择策略
现象: 发送了顺序消息,但消费端仍然乱序。
原因: syncSendOrderly 的 hashKey 选择不当,导致同一业务实体的消息被分散到不同 Queue。
解决方案: hashKey 必须选择业务实体的唯一标识(如 orderId),不能用 userId 或随机值。
// 错误:用 userId 作为 hashKey
// 同一用户的不同订单可能分散到不同 Queue
rocketMQTemplate.syncSendOrderly("topic", msg, user.getUserId());
// 正确:用 orderId 作为 hashKey
// 同一订单的所有状态变更路由到同一 Queue
rocketMQTemplate.syncSendOrderly("topic", msg, order.getOrderId());
坑3:事务消息的回查机制
现象: 事务消息发送后,消费者迟迟收不到消息。
原因: 本地事务执行成功但 Broker 未收到确认(网络抖动),回查逻辑返回了 ROLLBACK 或超时未响应。
解决方案: 必须实现 checkLocalTransaction 方法,且回查逻辑要查询本地事务的最终状态。
// 错误:回查方法返回 UNKNOWN,导致反复回查
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
return RocketMQLocalTransactionState.UNKNOWN; // 永远不要返回 UNKNOWN
}
// 正确:查询本地事务状态,返回明确结果
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String orderId = (String) msg.getHeaders().get("KEYS");
boolean exists = orderQueryService.existsById(orderId);
return exists ? RocketMQLocalTransactionState.COMMIT
: RocketMQLocalTransactionState.ROLLBACK;
}
坑4:Consumer 负载不均衡
现象: 4个消费者实例,但只有2个在消费,另外2个空闲。
原因: Queue 数量少于消费者实例数量。RocketMQ 的分配策略是1个 Queue 只能被1个消费者消费。
解决方案: 消费者实例数 ≤ Queue 数量。如果需要更多消费者,先扩容 Queue。
# 扩容 Queue 数量(从4个扩到8个)
docker exec -it namesrv1 sh -c \
"cd /home/rocketmq && sh mqadmin updateTopic \
-n localhost:9876 \
-t order-topic \
-c DefaultCluster \
-r 8"
坑5:消息堆积导致磁盘满
现象: Broker 磁盘使用率飙到100%,写入失败,整个集群不可用。
原因: 消费速度远低于生产速度,消息持续堆积,磁盘空间耗尽。
解决方案: 设置磁盘水位线告警,提前扩容或清理过期消息。
# Broker 配置 - 磁盘保护
# 磁盘使用率超过 75% 开始告警
diskMaxUsedSpaceRatio = 75
# 磁盘使用率超过 85% 拒绝写入
diskMaxUsedSpaceRatioHigh = 85
# 过期消息保留时间缩短到 24 小时
fileReservedTime = 24
# 每天凌晨 4 点清理过期文件
deleteWhen = 04
9. 总结与下一步
核心要点回顾
架构层面: RocketMQ 采用 NameServer + Broker + Producer + Consumer 四层架构,CommitLog 顺序写保证高性能,ConsumeQueue 实现高效消费。
消息类型: 普通消息解耦异步,顺序消息保证有序,事务消息实现最终一致性,延迟消息精准定时。四种消息类型覆盖了绝大多数业务场景。
高可用设计: 多主多从 + Dledger 自动切换,发送端确认 + 消费端重试 + 死信队列三道防线,本地表 + 补偿任务保证消息不丢。
阿里云增强: 消息轨迹、定时消息、SQL 过滤、HTTP 接入、内置监控,托管版综合成本约为自建的30%。
避坑要点: 幂等消费是必须项,顺序消息的 hashKey 要选对,事务消息必须实现回查,Queue 数量决定最大并行度,磁盘保护不可忽视。
下一步预告
下一篇将分享阿里云安全体系实战,涵盖 WAF、DDoS 防护、SSL 证书管理和安全组配置,构建从网络层到应用层的纵深防御体系。
📜 真实性声明
所有技术点均基于 RocketMQ 5.3.x 和 rocketmq-spring-boot-starter 2.3.x 版本验证。代码示例经过本地环境测试通过,性能数据基于测试环境实测。业务场景参考了电商订单系统的典型架构模式。
如有任何疑问,欢迎在评论区交流讨论。