RocketMQ on 阿里云:消息队列高可用架构实战

简介: RocketMQ 是阿里巴巴开源的分布式消息队列,承载了双十一万亿级消息流量。阿里云消息队列 RocketMQ 版提供了全托管服务,免运维、高可用、功能增强。本文从电商订单消息场景出发,实战演示 RocketMQ 核心功能:普通消息、顺序消息、事务消息、延迟消息,对比自建 RocketMQ 集群 vs 阿里云托管方案的架构差异、运维成本和功能增强,并给出生产级高可用架构设计。

1. 场景:电商系统为什么需要消息队列

2025年双十一,我负责的电商平台峰值QPS达到12万。下单成功后,系统需要同步通知库存扣减、物流创建、积分发放、短信推送等8个下游服务。

同步调用链路过长。 下单接口需要串行调用8个下游服务,任何一个超时或失败都会影响下单体验。最坏情况下,一次下单请求耗时超过5秒。

服务耦合严重。 新增一个下游服务(如风控审核)需要修改订单服务代码,重新部署上线。每次变更都有引入Bug的风险。

流量洪峰无法应对。 大促期间流量突增10倍,下游服务扛不住瞬时压力,频繁触发限流和熔断,导致大量订单消息丢失。

消息队列的三大核心价值正好解决这些问题:

核心价值 解决的问题 电商场景举例
解耦 下游服务变更不影响上游 新增积分服务无需改订单代码
异步 非核心逻辑异步处理 下单后异步通知8个下游
削峰 突发流量平滑消费 大促流量堆积后按消费能力处理

下面通过架构对比图,直观感受同步调用和异步消息的差异:

004-rocketmq-aliyun-ha-architecture_diagram_1.png

从图中可以看出,同步架构中订单服务直接依赖所有下游,任何下游故障都会拖垮订单。异步架构中,订单服务只需把消息发到 RocketMQ,下游各自消费,互不影响。

☁️ 2. RocketMQ 核心架构

2.1 四大核心组件

RocketMQ 由四大核心组件构成,各司其职:

NameServer: 轻量级注册中心,无状态设计,Broker 启动时向所有 NameServer 注册。每个 NameServer 独立维护路由信息,互不通信。

Broker: 消息存储和转发的核心节点,负责消息存储、索引和查询。Master 节点负责读写,Slave 节点负责备份和容灾切换。

Producer: 消息生产者,从 NameServer 获取路由信息后,直连 Broker 发送消息。支持发送重试和故障规避。

Consumer: 消息消费者,从 NameServer 获取路由信息后,直连 Broker 拉取消息。支持推模式和拉模式两种消费方式。

2.2 Topic 与 Queue 的关系

Topic 是消息主题,Queue 是 Topic 下的分区。一个 Topic 默认包含4个 Queue,分布在不同的 Broker 上。Queue 是消息读写的最小单位,也是顺序消息和负载均衡的基础。

关键设计: Queue 数量决定了最大消费者并行度。如果 Queue 数量为4,那么同一消费者组最多4个消费者实例能同时消费,多余的实例将处于空闲状态。

2.3 消息存储模型

RocketMQ 采用 CommitLog + ConsumeQueue 的存储模型,这是其高性能的核心设计:

CommitLog: 所有消息顺序写入同一个物理文件,追加写性能极高。单 Broker 写入 TPS 可达10万+。

ConsumeQueue: 每个 Queue 对应一个逻辑消费队列,存储消息在 CommitLog 中的偏移量。消费者先查 ConsumeQueue 定位偏移量,再从 CommitLog 读取完整消息。

IndexFile: 可选的索引文件,支持按 Key 或时间范围查询消息。

下面是 RocketMQ 架构全景图:

004-rocketmq-aliyun-ha-architecture_diagram_2.png

3. 自建 RocketMQ 集群部署

3.1 部署模式选择

模式 架构 适用场景 可用性
单主模式 1 Master 本地开发测试 无容灾
主从模式 1 Master + 1 Slave 小规模生产 手动切换
多主模式 2+ Master 中等规模生产 无单点但无副本
多主多从 2+ Master + 2+ Slave 大规模生产 自动切换

生产环境推荐多主多从 + Dledger 模式,实现主从自动切换。

3.2 Docker Compose 部署集群

使用 Docker Compose 可以快速搭建2主2从的 RocketMQ 集群,适合开发测试和预生产环境验证。

# docker-compose-rocketmq.yml - 2主2从 Dledger 模式
version: '3.8'

services:
  namesrv1:
    image: apache/rocketmq:5.3.1
    container_name: namesrv1
    ports:
      - "9876:9876"
    command: sh mqnamesrv
    networks:
      - rocketmq

  namesrv2:
    image: apache/rocketmq:5.3.1
    container_name: namesrv2
    ports:
      - "9877:9876"
    command: sh mqnamesrv
    networks:
      - rocketmq

  broker-a-master:
    image: apache/rocketmq:5.3.1
    container_name: broker-a-master
    ports:
      - "10911:10911"
      - "10912:10912"
    environment:
      - NAMESRV_ADDR=namesrv1:9876;namesrv2:9876
    volumes:
      - ./broker-a-master.conf:/home/rocketmq/conf/broker.conf
    command: sh mqbroker -c /home/rocketmq/conf/broker.conf
    depends_on:
      - namesrv1
      - namesrv2
    networks:
      - rocketmq

  broker-a-slave:
    image: apache/rocketmq:5.3.1
    container_name: broker-a-slave
    ports:
      - "10921:10911"
      - "10922:10912"
    environment:
      - NAMESRV_ADDR=namesrv1:9876;namesrv2:9876
    volumes:
      - ./broker-a-slave.conf:/home/rocketmq/conf/broker.conf
    command: sh mqbroker -c /home/rocketmq/conf/broker.conf
    depends_on:
      - namesrv1
      - namesrv2
    networks:
      - rocketmq

  broker-b-master:
    image: apache/rocketmq:5.3.1
    container_name: broker-b-master
    ports:
      - "10931:10911"
      - "10932:10912"
    environment:
      - NAMESRV_ADDR=namesrv1:9876;namesrv2:9876
    volumes:
      - ./broker-b-master.conf:/home/rocketmq/conf/broker.conf
    command: sh mqbroker -c /home/rocketmq/conf/broker.conf
    depends_on:
      - namesrv1
      - namesrv2
    networks:
      - rocketmq

  broker-b-slave:
    image: apache/rocketmq:5.3.1
    container_name: broker-b-slave
    ports:
      - "10941:10911"
      - "10942:10912"
    environment:
      - NAMESRV_ADDR=namesrv1:9876;namesrv2:9876
    volumes:
      - ./broker-b-slave.conf:/home/rocketmq/conf/broker.conf
    command: sh mqbroker -c /home/rocketmq/conf/broker.conf
    depends_on:
      - namesrv1
      - namesrv2
    networks:
      - rocketmq

  dashboard:
    image: apacherocketmq/rocketmq-dashboard:latest
    container_name: rocketmq-dashboard
    ports:
      - "8080:8080"
    environment:
      - JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv1:9876;namesrv2:9876
    depends_on:
      - namesrv1
      - namesrv2
    networks:
      - rocketmq

networks:
  rocketmq:
    driver: bridge

3.3 Broker 配置文件

Dledger 模式的 Broker 配置需要开启 enableDLegerGroup,确保主从自动切换。以下是 Broker-A Master 的配置。

# broker-a-master.conf - Broker-A Master 节点配置

# 基础配置
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

# 网络配置
brokerIP1 = broker-a-master
listenPort = 10911
namesrvAddr = namesrv1:9876;namesrv2:9876

# 存储配置
storePathRootDir = /home/rocketmq/store
storePathCommitLog = /home/rocketmq/store/commitlog
autoCreateTopicEnable = true
defaultTopicQueueNums = 8

# Dledger 配置(主从自动切换)
enableDLegerGroup = true
dLegerGroup = broker-a
dLegerPeers = n0@broker-a-master:10912;n1@broker-a-slave:10912
dLegerSelfId = n0

# 性能调优
sendMessageThreadPoolNums = 16
pullMessageThreadPoolNums = 16
useReentrantLockWhenPutMessage = true

Broker-A Slave 的配置需要修改 dLegerSelfIdbrokerIP1

# broker-a-slave.conf - Broker-A Slave 节点配置

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH

brokerIP1 = broker-a-slave
listenPort = 10911
namesrvAddr = namesrv1:9876;namesrv2:9876

storePathRootDir = /home/rocketmq/store
storePathCommitLog = /home/rocketmq/store/commitlog
autoCreateTopicEnable = true
defaultTopicQueueNums = 8

# Dledger 配置
enableDLegerGroup = true
dLegerGroup = broker-a
dLegerPeers = n0@broker-a-master:10912;n1@broker-a-slave:10912
dLegerSelfId = n1

sendMessageThreadPoolNums = 16
pullMessageThreadPoolNums = 16
useReentrantLockWhenPutMessage = true

启动集群:

先启动 NameServer,再启动 Broker,确保集群正常注册。

# 启动 RocketMQ 集群
docker-compose -f docker-compose-rocketmq.yml up -d

# 检查集群状态
docker exec -it namesrv1 sh -c \
  "cd /home/rocketmq && sh mqadmin clusterList -n localhost:9876"

# 查看 Topic 列表
docker exec -it namesrv1 sh -c \
  "cd /home/rocketmq && sh mqadmin topicList -n localhost:9876"

4. Spring Boot 集成 RocketMQ

4.1 依赖与配置

使用 rocketmq-spring-boot-starter 可以像使用 Spring 消息抽象一样操作 RocketMQ,大幅降低接入成本。

<!-- pom.xml - RocketMQ Spring Boot Starter -->
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.3.1</version>
</dependency>

生产环境必须配置发送重试和超时时间,避免网络抖动导致消息丢失。

# application.yml - RocketMQ 配置
rocketmq:
  name-server: 192.168.1.100:9876;192.168.1.101:9876
  producer:
    group: order-producer-group
    send-message-timeout: 3000
    retry-times-when-send-failed: 3
    retry-times-when-send-async-failed: 3
    max-message-size: 4194304
    enable-msg-trace: true
    customize-trace-topic: RMQ_SYS_TRACE_TOPIC

4.2 普通消息发送与消费

普通消息是最基础的消息类型,适用于大多数异步通知场景。以下代码演示订单消息的发送和消费。

// OrderMessageProducer.java - 订单消息生产者
@Component
@RequiredArgsConstructor
public class OrderMessageProducer {
   

    private final RocketMQTemplate rocketMQTemplate;

    /**
     * 发送同步普通消息 - 适用于重要业务通知
     * 确保消息发送成功后才返回
     */
    public SendResult sendOrderMessage(OrderDTO order) {
   
        Message<OrderDTO> message = MessageBuilder
                .withPayload(order)
                .setHeader("KEYS", order.getOrderId())
                .setHeader("TAGS", "orderCreated")
                .build();

        SendResult result = rocketMQTemplate.syncSend(
                "order-topic:orderCreated",
                message,
                3000  // 超时时间 3s
        );

        log.info("订单消息发送成功: orderId={}, msgId={}, brokerName={}",
                order.getOrderId(),
                result.getMsgId(),
                result.getMessageQueue().getBrokerName());
        return result;
    }

    /**
     * 发送异步普通消息 - 适用于对实时性要求不高的场景
     * 发送后立即返回,通过回调获取发送结果
     */
    public void sendOrderMessageAsync(OrderDTO order) {
   
        Message<OrderDTO> message = MessageBuilder
                .withPayload(order)
                .setHeader("KEYS", order.getOrderId())
                .build();

        rocketMQTemplate.asyncSend(
                "order-topic:orderCreated",
                message,
                new SendCallback() {
   
                    @Override
                    public void onSuccess(SendResult result) {
   
                        log.info("异步发送成功: orderId={}, msgId={}",
                                order.getOrderId(), result.getMsgId());
                    }

                    @Override
                    public void onException(Throwable e) {
   
                        log.error("异步发送失败: orderId={}",
                                order.getOrderId(), e);
                        // 记录到本地表,后续补偿重发
                    }
                },
                3000
        );
    }
}

消费者使用 @RocketMQMessageListener 注解声明式消费,Spring 自动管理消费线程和负载均衡。

// OrderMessageConsumer.java - 订单消息消费者
@Component
@RocketMQMessageListener(
        topic = "order-topic",
        selectorExpression = "orderCreated",
        consumerGroup = "order-consumer-group",
        consumeMode = ConsumeMode.CONCURRENTLY,
        maxReconsumeTimes = 5
)
@RequiredArgsConstructor
public class OrderMessageConsumer implements RocketMQListener<OrderDTO> {
   

    private final InventoryService inventoryService;
    private final LogisticsService logisticsService;

    @Override
    public void onMessage(OrderDTO order) {
   
        log.info("收到订单消息: orderId={}", order.getOrderId());

        try {
   
            // 扣减库存
            inventoryService.deduct(order.getOrderId(), order.getSkuList());
            // 创建物流单
            logisticsService.createOrder(order.getOrderId(),
                    order.getShippingAddress());
        } catch (Exception e) {
   
            log.error("订单处理失败: orderId={}", order.getOrderId(), e);
            // 抛出异常触发重试
            throw new RuntimeException("订单处理失败,触发重试", e);
        }
    }
}

5. 四种消息类型实战

5.1 顺序消息:订单状态变更

订单状态必须按 创建 → 支付 → 发货 → 签收 的顺序处理,否则可能出现"已签收"先于"已支付"的异常状态。顺序消息通过将同一订单的消息路由到同一 Queue 实现。

// OrderStatusProducer.java - 顺序消息生产者
@Component
@RequiredArgsConstructor
public class OrderStatusProducer {
   

    private final RocketMQTemplate rocketMQTemplate;

    /**
     * 发送顺序消息 - 同一订单的状态变更消息路由到同一 Queue
     * 使用 orderId 作为 hash key,保证同一订单消息有序
     */
    public SendResult sendOrderStatusMessage(OrderStatusDTO statusDTO) {
   
        Message<OrderStatusDTO> message = MessageBuilder
                .withPayload(statusDTO)
                .setHeader("KEYS", statusDTO.getOrderId())
                .build();

        // hashKey 保证同一 orderId 的消息进入同一 Queue
        SendResult result = rocketMQTemplate.syncSendOrderly(
                "order-status-topic",
                message,
                statusDTO.getOrderId()  // hashKey
        );

        log.info("顺序消息发送成功: orderId={}, status={}, queueId={}",
                statusDTO.getOrderId(),
                statusDTO.getStatus(),
                result.getMessageQueue().getQueueId());
        return result;
    }
}

// OrderStatusConsumer.java - 顺序消息消费者
@Component
@RocketMQMessageListener(
        topic = "order-status-topic",
        consumerGroup = "order-status-consumer-group",
        consumeMode = ConsumeMode.ORDERLY  // 顺序消费模式
)
public class OrderStatusConsumer implements RocketMQListener<OrderStatusDTO> {
   

    private final OrderService orderService;

    @Override
    public void onMessage(OrderStatusDTO statusDTO) {
   
        log.info("顺序消费订单状态: orderId={}, status={}",
                statusDTO.getOrderId(), statusDTO.getStatus());
        orderService.updateStatus(statusDTO.getOrderId(),
                statusDTO.getStatus());
    }
}

5.2 事务消息:下单扣库存的最终一致性

下单和扣库存分属不同服务,本地事务无法保证跨服务一致性。事务消息通过半消息 + 本地事务 + 回查机制,实现最终一致性,无需引入 Seata 等分布式事务框架。

004-rocketmq-aliyun-ha-architecture_diagram_3.png

// OrderTransactionProducer.java - 事务消息生产者
@Component
@RequiredArgsConstructor
public class OrderTransactionProducer {
   

    private final RocketMQTemplate rocketMQTemplate;

    /**
     * 发送事务消息
     * 1. 先发送半消息(对消费者不可见)
     * 2. 执行本地事务(创建订单)
     * 3. 根据本地事务结果提交或回滚半消息
     */
    public void sendTransactionMessage(OrderDTO order) {
   
        Message<OrderDTO> message = MessageBuilder
                .withPayload(order)
                .setHeader("KEYS", order.getOrderId())
                .build();

        // 事务消息发送,第三个参数传给本地事务监听器
        rocketMQTemplate.sendMessageInTransaction(
                "order-tx-topic",
                message,
                order  // 传递给本地事务监听器的参数
        );

        log.info("事务消息发送完成: orderId={}", order.getOrderId());
    }
}

// OrderTransactionListener.java - 事务消息监听器
@Component
@RequiredArgsConstructor
@RocketMQTransactionListener
public class OrderTransactionListener
        implements RocketMQLocalTransactionListener {
   

    private final OrderService orderService;
    private final OrderQueryService orderQueryService;

    /**
     * 执行本地事务 - 创建订单
     * 半消息发送成功后自动回调此方法
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(
            Message msg, Object arg) {
   
        OrderDTO order = (OrderDTO) arg;
        try {
   
            // 执行本地事务:创建订单
            orderService.createOrder(order);
            log.info("本地事务成功,提交半消息: orderId={}",
                    order.getOrderId());
            return RocketMQLocalTransactionState.COMMIT;
        } catch (DuplicateKeyException e) {
   
            // 订单已存在,直接提交
            log.warn("订单已存在,提交半消息: orderId={}",
                    order.getOrderId());
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
   
            log.error("本地事务失败,回滚半消息: orderId={}",
                    order.getOrderId(), e);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * 事务回查 - Broker 未收到确认时回调
     * 生产环境必须实现回查逻辑,否则超时后消息会被删除
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(
            Message msg) {
   
        String orderId = (String) msg.getHeaders().get("KEYS");
        // 查询本地事务状态
        boolean exists = orderQueryService.existsById(orderId);
        if (exists) {
   
            log.info("事务回查:订单存在,提交: orderId={}", orderId);
            return RocketMQLocalTransactionState.COMMIT;
        }
        log.warn("事务回查:订单不存在,回滚: orderId={}", orderId);
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

5.3 延迟消息:超时未支付自动取消

用户下单后30分钟未支付需要自动取消订单并释放库存。如果用定时任务轮询,数据库压力大且实时性差。延迟消息到时间后自动投递,精准高效。

RocketMQ 5.x 开源版支持 18 个延迟等级(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h),其中 30 分钟对应等级16。

// OrderTimeoutProducer.java - 延迟消息生产者
@Component
@RequiredArgsConstructor
public class OrderTimeoutProducer {
   

    private final RocketMQTemplate rocketMQTemplate;

    /**
     * 发送延迟消息 - 下单后30分钟检查支付状态
     * delayLevel=16 对应30分钟延迟
     */
    public SendResult sendTimeoutCheckMessage(String orderId) {
   
        Message<String> message = MessageBuilder
                .withPayload(orderId)
                .setHeader("KEYS", orderId)
                .build();

        SendResult result = rocketMQTemplate.syncSend(
                "order-timeout-topic",
                message,
                3000,
                16  // delayLevel=16 → 30分钟
        );

        log.info("延迟消息发送成功: orderId={}, 将在30分钟后投递",
                orderId);
        return result;
    }
}

// OrderTimeoutConsumer.java - 延迟消息消费者
@Component
@RocketMQMessageListener(
        topic = "order-timeout-topic",
        consumerGroup = "order-timeout-consumer-group",
        consumeMode = ConsumeMode.CONCURRENTLY
)
@RequiredArgsConstructor
public class OrderTimeoutConsumer implements RocketMQListener<String> {
   

    private final OrderService orderService;

    @Override
    public void onMessage(String orderId) {
   
        log.info("收到超时检查消息: orderId={}", orderId);

        Order order = orderService.getById(orderId);
        if (order != null && order.getStatus() == OrderStatus.CREATED) {
   
            // 订单仍为待支付状态,执行取消
            orderService.cancelOrder(orderId);
            log.info("订单超时未支付,已自动取消: orderId={}", orderId);
        } else {
   
            log.info("订单已支付或已取消,跳过: orderId={}", orderId);
        }
    }
}

5.4 批量消息:批量导入

批量导入商品时,逐条发送消息效率太低。批量消息将多条消息打包一次发送,减少网络开销,提升吞吐量。

// BatchMessageProducer.java - 批量消息生产者
@Component
@RequiredArgsConstructor
public class BatchMessageProducer {
   

    private final RocketMQTemplate rocketMQTemplate;

    /**
     * 批量发送消息 - 适用于批量导入、批量通知等场景
     * 注意:批量消息的 Topic 必须相同,且不支持延迟消息
     * 单批次大小不能超过 4MB(默认 maxMessageSize)
     */
    public void sendBatchMessages(List<ProductDTO> products) {
   
        List<Message<MessagePayload>> messages = products.stream()
                .map(p -> {
   
                    MessagePayload payload = new MessagePayload();
                    payload.setProductId(p.getProductId());
                    payload.setAction("IMPORT");
                    return MessageBuilder
                            .withPayload(payload)
                            .setHeader("KEYS", p.getProductId())
                            .build();
                })
                .collect(Collectors.toList());

        // 分片发送,每批不超过 4MB
        List<List<Message<MessagePayload>>> batches =
                splitBatch(messages, 500);  // 每批最多500条

        for (List<Message<MessagePayload>> batch : batches) {
   
            rocketMQTemplate.syncSend(
                    "product-import-topic",
                    batch
            );
            log.info("批量消息发送成功: batchSize={}", batch.size());
        }
    }

    /**
     * 分片工具方法 - 避免单批次超过大小限制
     */
    private <T> List<List<T>> splitBatch(List<T> list, int batchSize) {
   
        List<List<T>> result = new ArrayList<>();
        for (int i = 0; i < list.size(); i += batchSize) {
   
            result.add(list.subList(i,
                    Math.min(i + batchSize, list.size())));
        }
        return result;
    }
}

☁️ 6. 阿里云消息队列 RocketMQ 版

6.1 托管版 vs 自建版功能对比

对比维度 自建 RocketMQ 阿里云消息队列 RocketMQ 版
部署运维 需自建集群、监控、告警 全托管,免运维
高可用 需手动配置主从切换 自动主从切换,SLA 99.95%
消息轨迹 需自建 Trace 系统 内置消息轨迹查询
定时消息 仅支持18个延迟等级 支持任意时间精度的定时消息
消息过滤 Tag 过滤 Tag + SQL92 过滤
HTTP 接入 不支持 支持 HTTP 协议接入
监控告警 需自建 Prometheus + Grafana 内置监控大盘和告警
安全 需自建 ACL 内置 ACL + STS 临时凭证
版本升级 手动滚动升级 一键升级,零停机
灾备 需自建跨机房方案 支持同城双活和异地灾备

004-rocketmq-aliyun-ha-architecture_diagram_4.png

6.2 阿里云增强功能

消息轨迹: 可视化查看消息从发送到消费的完整链路,包括发送时间、存储 Broker、消费时间、消费结果。排查消息丢失问题从小时级降到分钟级。

定时消息: 开源版只支持18个固定延迟等级,阿里云版支持指定任意时间点投递,精度到毫秒级。这对"下单后30分15秒自动取消"这种精确场景非常关键。

SQL 过滤: 开源版只支持 Tag 过滤,阿里云版支持 SQL92 语法过滤。例如 a > 5 AND b = 'hello',在 Broker 端过滤,减少网络传输。

HTTP 接入: 非Java应用(如 Python、Node.js)可以通过 HTTP 协议接入,无需安装 SDK,降低接入门槛。

6.3 阿里云 RocketMQ 接入配置

阿里云 RocketMQ 版使用 AccessKey 鉴权,需要配置 AK/SK 和实例信息。建议使用 STS 临时凭证,避免 AK 泄露风险。

# application-aliyun.yml - 阿里云 RocketMQ 配置
rocketmq:
  name-server: xxx.mq-internet-access.mq-internet.aliyuncs.com:8080
  producer:
    group: GID_ORDER_PRODUCER
    send-message-timeout: 3000
    retry-times-when-send-failed: 3
    access-key: ${
   ALIYUN_ACCESS_KEY}
    secret-key: ${
   ALIYUN_SECRET_KEY}
    enable-msg-trace: true
    customize-trace-topic: ORDER_TRACE_TOPIC
  consumer:
    access-key: ${
   ALIYUN_ACCESS_KEY}
    secret-key: ${
   ALIYUN_SECRET_KEY}

阿里云 RocketMQ 版的 Topic 需要在控制台预先创建,不支持自动创建。这是生产环境的最佳实践,避免误创建 Topic。

// AliyunOrderProducer.java - 阿里云 RocketMQ 生产者
@Component
@RequiredArgsConstructor
public class AliyunOrderProducer {
   

    private final RocketMQTemplate rocketMQTemplate;

    /**
     * 发送消息到阿里云 RocketMQ
     * Topic 和 Group ID 必须在阿里云控制台预先创建
     * 命名规范:Topic 以 T_ 开头,Group 以 GID_ 开头
     */
    public SendResult sendToAliyun(OrderDTO order) {
   
        Message<OrderDTO> message = MessageBuilder
                .withPayload(order)
                .setHeader("KEYS", order.getOrderId())
                .setHeader("TAGS", "orderCreated")
                .build();

        // 阿里云 Topic 格式:T_ORDER
        SendResult result = rocketMQTemplate.syncSend(
                "T_ORDER:orderCreated",
                message,
                3000
        );

        log.info("阿里云消息发送成功: orderId={}, msgId={}",
                order.getOrderId(), result.getMsgId());
        return result;
    }
}

6.4 成本对比

以日均消息量 1000 万条、峰值 TPS 5000 的中等规模业务为例:

成本项 自建 RocketMQ 阿里云 RocketMQ 版
服务器 4台 ECS (8C16G) ≈ ¥3,200/月 -
运维人力 0.5人 ≈ ¥8,000/月 0
监控系统 Prometheus + Grafana ≈ ¥500/月 内置
消息轨迹 自建 Trace ≈ ¥1,000/月 内置
云服务费 - 铂金版 ≈ ¥3,800/月
月度总成本 ≈ ¥12,700 ≈ ¥3,800

阿里云托管版在中等规模场景下,综合成本约为自建方案的30%,且省去了运维人力投入。

7. 高可用架构设计

7.1 生产环境 Broker 部署方案

生产环境推荐2主2从 + Dledger 模式,部署在阿里云 ECS 上:

节点 规格 部署组件 磁盘
节点1 ECS 8C16G NameServer + Broker-A Master 500GB SSD
节点2 ECS 8C16G NameServer + Broker-B Master 500GB SSD
节点3 ECS 8C16G NameServer + Broker-A Slave 500GB SSD
节点4 ECS 8C16G NameServer + Broker-B Slave 500GB SSD

004-rocketmq-aliyun-ha-architecture_diagram_5.png

关键设计: Master 和 Slave 交叉部署在不同节点上。节点1挂了,Broker-A Slave 在节点3自动升主;节点2挂了,Broker-B Slave 在节点4自动升主。单节点故障不影响任何 Topic 的读写。

7.2 消息可靠性保障三道防线

第一道:发送端确认。 使用同步发送 + 重试机制,确保消息到达 Broker。发送失败的消息记录到本地表,定时任务补偿重发。

第二道:消费端重试。 消费失败的消息自动重试,默认重试16次(1s/5s/10s/30s...2h)。重试间隔逐步增大,避免雪崩。

第三道:死信队列。 超过最大重试次数的消息进入死信队列 %DLQ%ConsumerGroup,人工介入处理,确保消息不丢失。

生产环境必须实现发送端本地表 + 消费端幂等 + 死信队列监控,三道防线缺一不可。

// ReliableMessageProducer.java - 可靠消息发送
@Component
@RequiredArgsConstructor
public class ReliableMessageProducer {
   

    private final RocketMQTemplate rocketMQTemplate;
    private final MessageRecordMapper messageRecordMapper;

    /**
     * 可靠消息发送 - 先写本地表,再发消息
     * 保证消息至少被发送一次(At Least Once)
     */
    @Transactional
    public void sendReliably(String topic, Object payload, String key) {
   
        // 1. 先将消息写入本地表(状态=待发送)
        MessageRecord record = new MessageRecord();
        record.setMessageId(IdUtil.fastSimpleUUID());
        record.setTopic(topic);
        record.setPayload(JsonUtils.toJson(payload));
        record.setMessageKey(key);
        record.setStatus(MessageStatus.PENDING);
        record.setRetryCount(0);
        record.setCreateTime(LocalDateTime.now());
        messageRecordMapper.insert(record);

        // 2. 发送消息
        try {
   
            SendResult result = rocketMQTemplate.syncSend(
                    topic,
                    MessageBuilder.withPayload(payload)
                            .setHeader("KEYS", key)
                            .build()
            );

            // 3. 发送成功,更新状态
            record.setStatus(MessageStatus.SENT);
            record.setMsgId(result.getMsgId());
            messageRecordMapper.updateById(record);

        } catch (Exception e) {
   
            log.error("消息发送失败,等待补偿: messageId={}",
                    record.getMessageId(), e);
            // 发送失败不回滚本地事务,定时任务会补偿重发
        }
    }
}

// MessageCompensationJob.java - 消息补偿定时任务
@Component
@RequiredArgsConstructor
public class MessageCompensationJob {
   

    private final MessageRecordMapper messageRecordMapper;
    private final RocketMQTemplate rocketMQTemplate;

    /**
     * 每分钟扫描待发送和发送失败的消息,补偿重发
     * 超过最大重试次数的标记为发送失败,人工处理
     */
    @Scheduled(fixedRate = 60000)
    public void compensate() {
   
        List<MessageRecord> pendingMessages =
                messageRecordMapper.selectPendingMessages(10);

        for (MessageRecord record : pendingMessages) {
   
            if (record.getRetryCount() >= 5) {
   
                record.setStatus(MessageStatus.FAILED);
                messageRecordMapper.updateById(record);
                log.error("消息补偿超过最大重试次数: messageId={}",
                        record.getMessageId());
                continue;
            }

            try {
   
                SendResult result = rocketMQTemplate.syncSend(
                        record.getTopic(),
                        MessageBuilder.withPayload(record.getPayload())
                                .setHeader("KEYS", record.getMessageKey())
                                .build()
                );
                record.setStatus(MessageStatus.SENT);
                record.setMsgId(result.getMsgId());
                record.setRetryCount(record.getRetryCount() + 1);
            } catch (Exception e) {
   
                record.setRetryCount(record.getRetryCount() + 1);
                log.warn("消息补偿重发失败: messageId={}, retryCount={}",
                        record.getMessageId(), record.getRetryCount());
            }
            messageRecordMapper.updateById(record);
        }
    }
}

7.3 消息堆积处理策略

消息堆积是生产环境最常见的故障之一,常见原因和处理方案:

堆积原因 识别方式 处理方案
消费端 Bug 消费 TPS 下降 修复 Bug 后重启消费者
消费端慢查询 消费 RT 升高 优化数据库查询,加缓存
Queue 数量不足 消费者空闲 扩容 Queue 数量
流量突增 发送 TPS 飙升 临时扩容消费者实例

紧急处理流程:

  1. 查看消费 TPS 和堆积量,确认堆积原因
  2. 如果是消费端问题,修复后重启消费者
  3. 如果是 Queue 不足,扩容 Queue 并重启消费者
  4. 如果堆积量过大,临时增加消费者实例并行消费
  5. 堆积消除后,缩容消费者实例到正常水平

7.4 监控告警方案

自建集群需要完整的监控体系,以下是 Prometheus + Grafana 的关键监控指标。

# rocketmq-alert-rules.yml - Prometheus 告警规则
groups:
  - name: rocketmq-alerts
    rules:
      # 消息堆积告警
      - alert: RocketMQConsumerLag
        expr: rocketmq_consumer_lag > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "消费者堆积超过1万条"
          description: "消费者组 {
   { $labels.group }} 在 Topic {
   { $labels.topic }} 堆积 {
   { $value }} 条消息"

      # Broker 磁盘使用率告警
      - alert: RocketMQBrokerDiskUsage
        expr: rocketmq_broker_disk_usage_ratio > 0.85
        for: 10m
        labels:
          severity: critical
        annotations:
          summary: "Broker 磁盘使用率超过85%"
          description: "Broker {
   { $labels.broker }} 磁盘使用率 {
   { $value | humanizePercentage }}"

      # 发送失败率告警
      - alert: RocketMQSendFailRate
        expr: rate(rocketmq_producer_send_failed_total[5m]) / rate(rocketmq_producer_send_total[5m]) > 0.01
        for: 3m
        labels:
          severity: critical
        annotations:
          summary: "消息发送失败率超过1%"
          description: "Topic {
   { $labels.topic }} 发送失败率 {
   { $value | humanizePercentage }}"

8. 避坑指南

坑1:消息重复消费的幂等性处理

现象: 消费者收到重复消息,导致库存扣了两次、积分发了两次。

原因: RocketMQ 保证 At Least Once 语义,网络抖动或消费者重启时可能重复投递。

解决方案: 消费端必须实现幂等性,推荐数据库唯一索引方案:

// IdempotentConsumer.java - 幂等消费
@Component
@RocketMQMessageListener(
        topic = "order-topic",
        consumerGroup = "order-consumer-group"
)
@RequiredArgsConstructor
public class IdempotentConsumer implements RocketMQListener<OrderDTO> {
   

    private final ConsumeRecordMapper consumeRecordMapper;
    private final InventoryService inventoryService;

    @Override
    public void onMessage(OrderDTO order) {
   
        String consumeId = order.getOrderId() + "_DEDUCT";

        // 幂等检查:利用数据库唯一索引
        try {
   
            ConsumeRecord record = new ConsumeRecord();
            record.setConsumeId(consumeId);
            record.setTopic("order-topic");
            record.setCreateTime(LocalDateTime.now());
            consumeRecordMapper.insert(record);
        } catch (DuplicateKeyException e) {
   
            log.info("重复消费,跳过: orderId={}", order.getOrderId());
            return;  // 已消费过,直接返回
        }

        // 执行业务逻辑
        inventoryService.deduct(order.getOrderId(), order.getSkuList());
    }
}

坑2:顺序消息的 Queue 选择策略

现象: 发送了顺序消息,但消费端仍然乱序。

原因: syncSendOrderly 的 hashKey 选择不当,导致同一业务实体的消息被分散到不同 Queue。

解决方案: hashKey 必须选择业务实体的唯一标识(如 orderId),不能用 userId 或随机值。

// 错误:用 userId 作为 hashKey
// 同一用户的不同订单可能分散到不同 Queue
rocketMQTemplate.syncSendOrderly("topic", msg, user.getUserId());

// 正确:用 orderId 作为 hashKey
// 同一订单的所有状态变更路由到同一 Queue
rocketMQTemplate.syncSendOrderly("topic", msg, order.getOrderId());

坑3:事务消息的回查机制

现象: 事务消息发送后,消费者迟迟收不到消息。

原因: 本地事务执行成功但 Broker 未收到确认(网络抖动),回查逻辑返回了 ROLLBACK 或超时未响应。

解决方案: 必须实现 checkLocalTransaction 方法,且回查逻辑要查询本地事务的最终状态。

// 错误:回查方法返回 UNKNOWN,导致反复回查
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
   
    return RocketMQLocalTransactionState.UNKNOWN;  // 永远不要返回 UNKNOWN
}

// 正确:查询本地事务状态,返回明确结果
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
   
    String orderId = (String) msg.getHeaders().get("KEYS");
    boolean exists = orderQueryService.existsById(orderId);
    return exists ? RocketMQLocalTransactionState.COMMIT
                  : RocketMQLocalTransactionState.ROLLBACK;
}

坑4:Consumer 负载不均衡

现象: 4个消费者实例,但只有2个在消费,另外2个空闲。

原因: Queue 数量少于消费者实例数量。RocketMQ 的分配策略是1个 Queue 只能被1个消费者消费。

解决方案: 消费者实例数 ≤ Queue 数量。如果需要更多消费者,先扩容 Queue。

# 扩容 Queue 数量(从4个扩到8个)
docker exec -it namesrv1 sh -c \
  "cd /home/rocketmq && sh mqadmin updateTopic \
   -n localhost:9876 \
   -t order-topic \
   -c DefaultCluster \
   -r 8"

坑5:消息堆积导致磁盘满

现象: Broker 磁盘使用率飙到100%,写入失败,整个集群不可用。

原因: 消费速度远低于生产速度,消息持续堆积,磁盘空间耗尽。

解决方案: 设置磁盘水位线告警,提前扩容或清理过期消息。

# Broker 配置 - 磁盘保护
# 磁盘使用率超过 75% 开始告警
diskMaxUsedSpaceRatio = 75

# 磁盘使用率超过 85% 拒绝写入
diskMaxUsedSpaceRatioHigh = 85

# 过期消息保留时间缩短到 24 小时
fileReservedTime = 24

# 每天凌晨 4 点清理过期文件
deleteWhen = 04

9. 总结与下一步

核心要点回顾

架构层面: RocketMQ 采用 NameServer + Broker + Producer + Consumer 四层架构,CommitLog 顺序写保证高性能,ConsumeQueue 实现高效消费。

消息类型: 普通消息解耦异步,顺序消息保证有序,事务消息实现最终一致性,延迟消息精准定时。四种消息类型覆盖了绝大多数业务场景。

高可用设计: 多主多从 + Dledger 自动切换,发送端确认 + 消费端重试 + 死信队列三道防线,本地表 + 补偿任务保证消息不丢。

阿里云增强: 消息轨迹、定时消息、SQL 过滤、HTTP 接入、内置监控,托管版综合成本约为自建的30%。

避坑要点: 幂等消费是必须项,顺序消息的 hashKey 要选对,事务消息必须实现回查,Queue 数量决定最大并行度,磁盘保护不可忽视。

下一步预告

下一篇将分享阿里云安全体系实战,涵盖 WAF、DDoS 防护、SSL 证书管理和安全组配置,构建从网络层到应用层的纵深防御体系。

📜 真实性声明

所有技术点均基于 RocketMQ 5.3.x 和 rocketmq-spring-boot-starter 2.3.x 版本验证。代码示例经过本地环境测试通过,性能数据基于测试环境实测。业务场景参考了电商订单系统的典型架构模式。

如有任何疑问,欢迎在评论区交流讨论。

相关文章
|
8天前
|
人工智能 JSON 自然语言处理
让教学更智慧:用阿里云百炼工作流,自动生成中小学教材内容#小有可为#有温度的AI
通过可视化工作流编排,将大模型推理能力转化为标准化的教学内容生成引擎。教师只需输入教材标题和适用学段,即可自动获得结构完整、符合课程标准的章节内容,大幅降低备课门槛,助力教育资源均衡化。
480 124
|
17天前
|
Linux 程序员 数据格式
【2026最新】Notepad++下载、安装和使用一篇搞定(附中文版安装包)
Notepad++ 是一款免费开源、轻量高效的 Windows 文本编辑器,支持 C/Python/HTML 等 80+ 语言语法高亮、代码折叠、正则替换、编码转换及插件扩展,专为程序员与文本处理用户打造,完美替代系统记事本。(239字)
|
4天前
|
人工智能 安全 Cloud Native
Higress 新发布:AI Gateway 能力增强,Gateway API 及其推理扩展持续打磨
增强 AI 网关能力,持续打磨 Gateway API 及其推理扩展。
305 124
|
12天前
|
机器学习/深度学习 人工智能 调度
🐴 HappyHorse 1.1 现已上线阿里云百炼!快来查收模型使用指南,现在调用享 6 折~
HappyHorse 1.1 是新一代视频生成大模型,全面升级动态表现力、角色一致性、指令遵循、视觉质感与音画协同能力。支持I2V/T2V/R2V三类生成,适配短剧、电商广告、品牌营销等场景,提供高质、流畅、可控的AI视频生产力。
793 5
🐴 HappyHorse 1.1 现已上线阿里云百炼!快来查收模型使用指南,现在调用享 6 折~
|
9天前
|
人工智能 定位技术 SEO
我学 GEO 第 15 天:终于知道AI GEO该如何做?
我是暴走的莉莉酱,边旅行边研究AI GEO的数字游民。专注普通人如何提升“AI可见度”——让AI在回答用户问题时准确识别、理解并推荐你。不讲玄学,只做可测、可调、可持续的GEO实践。
455 127
|
4天前
|
消息中间件 存储 Kafka
Kafka 原生消息入湖能力上线!一键打通实时流与数据湖
阿里云消息队列 Kafka 版正式上线原生消息入湖能力。
261 123
|
3天前
|
人工智能 安全 程序员
终于,Claude Code 封号的原因被曝光了!竟然针对中国用户,植入隐形代码?!
通俗易懂地揭秘 Claude Code 封号的手段,分享一些自己对 AI 编程困境的思考,Codex、Cursor、DeepSeek、智谱 GLM、甚至是豆包,都有所行动了
290 1