吃透 RocketMQ

简介: 本文全面介绍Apache RocketMQ分布式消息中间件的核心架构、底层原理和生产实践。首先解析RocketMQ四大核心组件(NameServer、Broker、Producer、Consumer)的职责与协作机制,重点剖析其高性能存储设计(CommitLog、ConsumeQueue、IndexFile)、刷盘策略和主从复制原理。随后详细讲解基于Dledger Raft协议的高可用集群部署方案,包含环境准备、配置优化和监控部署。

前言

Apache RocketMQ 是历经双十一万亿级流量考验的分布式消息与流处理平台,现已成为 Apache 顶级项目,凭借低延迟、高吞吐、金融级可靠性和云原生架构,成为国内微服务、事件驱动架构中的主流消息中间件。本文将从底层核心原理到生产级集群部署,再到业务实战,用通俗语言讲透底层逻辑,兼顾深度与可读性,让读者既能夯实基础,又能直接解决生产实际问题。

一、RocketMQ 核心架构与角色职责

RocketMQ 采用经典的发布-订阅架构,核心由四大组件构成,各组件职责清晰、协同工作,整体架构如下:

1.1 NameServer:轻量级路由注册中心

NameServer 是 RocketMQ 的“路由中枢”,采用无状态设计,集群节点间互不通信,核心职责如下:

  • 路由信息管理:维护 Broker 集群的元数据,包括 Broker 地址、主从角色、Topic 与队列的映射关系,支持 O(1) 时间复杂度的路由查询。
  • Broker 注册与心跳检测:Broker 启动后会每隔 30 秒向所有 NameServer 节点上报自身状态;NameServer 若 120 秒未收到 Broker 心跳,会将该节点从路由表中剔除。
  • 客户端路由服务:Producer 和 Consumer 启动时会从 NameServer 拉取路由信息,并定时更新,实现动态服务发现。

1.2 Broker:消息存储与转发核心

Broker 是 RocketMQ 的“心脏”,承担消息的存储、投递、查询和高可用控制,是整个系统性能和可靠性的核心载体,核心职责:

  • 消息持久化存储:通过 CommitLog、ConsumeQueue、IndexFile 三大文件实现消息的高性能存储与快速检索。
  • 消息投递:处理 Consumer 的拉取请求,将消息推送给消费者,维护消费进度。
  • 高可用保障:支持主从架构,通过同步/异步复制实现数据多副本;基于 Dledger Raft 协议实现自动主从切换,保障服务持续可用。
  • 功能调度:实现事务消息回查、延时消息、重试与死信队列等核心功能。

1.3 Producer:消息生产者

Producer 负责将业务消息发送到 Broker 集群,支持同步、异步、单向三种发送方式,内置负载均衡、失败重试、故障规避机制,保障消息发送的可靠性。

1.4 Consumer:消息消费者

Consumer 负责从 Broker 拉取消息并执行业务逻辑,支持集群消费和广播消费两种模式,内置负载均衡、重平衡、消费重试机制,保障消息消费的完整性。

二、RocketMQ 核心底层原理

2.1 消息存储核心原理

RocketMQ 的高性能和高可靠,核心源于其优秀的存储设计,采用“统一日志存储+逻辑索引分离”的架构,彻底解决了传统消息队列多 Topic 下的性能衰减问题。

2.1.1 三大核心存储文件

1. CommitLog:消息主体存储文件

所有 Topic 的所有消息都会顺序追加写入 CommitLog,彻底利用机械硬盘的顺序写入高性能特性,SSD 盘性能更优。

  • 单个文件默认大小 1GB,文件名由 20 位数字组成,代表文件的起始物理偏移量,例如 00000000000000000000,下一个文件为 00000000001073741824
  • 消息写入完全顺序,读取时通过偏移量随机访问,配合 PageCache 和零拷贝技术,实现超高的读写性能。
2. ConsumeQueue:消费逻辑索引

ConsumeQueue 是消息的逻辑索引,每个 Topic 的每个队列对应一个 ConsumeQueue 文件,解决了消费者全量扫描 CommitLog 的性能问题。

  • 单个文件默认包含 30 万条条目,每个条目固定 20 字节,结构为:8字节CommitLog物理偏移量 + 4字节消息大小 + 8字节Tag哈希值,单个文件大小约 5.72MB。
  • Consumer 消费时,先从 ConsumeQueue 获取消息的物理偏移量,再根据偏移量从 CommitLog 读取完整消息,实现精准的消息检索。
3. IndexFile:消息索引文件

IndexFile 用于根据 Message Key 快速查询消息,采用哈希索引结构,实现毫秒级的消息查询。

  • 单个 IndexFile 包含 40 字节文件头、500 万个哈希槽(每个4字节)、2000 万个索引条目(每个20字节),单个文件大小约 400MB。
  • 当需要根据 Message Key 检索消息时,先计算 Key 的哈希值,找到对应的哈希槽,再通过索引条目获取消息的 CommitLog 偏移量,最终读取完整消息。

2.1.2 高性能存储优化机制

1. 零拷贝技术

RocketMQ 采用内存映射(MMAP)+ Sendfile 零拷贝技术,减少用户态与内核态之间的数据拷贝,大幅提升消息读写性能:

  • MMAP:将 CommitLog 文件直接映射到用户态内存,操作内存等同于操作文件,避免了内核缓冲区到用户缓冲区的拷贝。
  • Sendfile:消息网络传输时,直接从内核缓冲区发送到网卡,无需经过用户态,彻底减少数据拷贝次数。
2. PageCache 页缓存优化

消息写入时优先写入操作系统的 PageCache,而非直接刷盘,由操作系统异步将脏页刷写到磁盘;消息读取时,若 PageCache 中存在该数据,直接从内存返回,无需磁盘 IO。

  • 顺序写入的特性让 PageCache 预读机制效果最大化,大幅提升读取性能。
  • 冷热数据分离,热数据常驻 PageCache,实现内存级的读写性能。

2.1.3 刷盘机制:平衡性能与可靠性

RocketMQ 提供两种刷盘策略,用户可根据业务对可靠性和性能的需求灵活选择。

刷盘模式 核心逻辑 优点 缺点 适用场景
同步刷盘(SYNC_FLUSH) 消息写入 PageCache 后,立即调用 fsync 强制刷写到磁盘,刷盘成功后才返回发送成功 数据可靠性极高,机器宕机不会丢失消息 写入性能有损耗,延迟增加 金融、支付等对数据零丢失要求极高的场景
异步刷盘(ASYNC_FLUSH) 消息写入 PageCache 后立即返回发送成功,后台线程每隔固定时间(默认500ms)批量刷盘 写入性能极高,吞吐量和延迟表现优秀 机器宕机时会丢失 PageCache 中未刷盘的消息 大部分互联网业务,对性能要求高,允许极少量消息丢失的场景

2.1.4 主从复制与高可用

RocketMQ 提供两种主从数据复制模式,配合刷盘机制,实现不同等级的数据可靠性保障。

复制模式 核心逻辑 优点 缺点
同步复制 Master 收到消息后,需等待 Slave 同步完成并返回 ACK,才会给 Producer 返回发送成功 主备数据完全一致,Master 宕机后数据零丢失,Slave 可提供读服务 消息发送延迟增加,吞吐量有一定损耗
异步复制 Master 收到消息写入成功后,立即返回发送成功,后台异步将数据同步给 Slave 发送延迟低,吞吐量高,性能损耗极小 Master 宕机时,Slave 可能未同步完数据,会丢失少量消息

2.2 消息发送核心原理

2.2.1 三种消息发送方式

RocketMQ 提供三种消息发送方式,适配不同的业务场景:

  1. 同步发送:Producer 发送消息后,阻塞等待 Broker 的返回结果,直到发送成功或超时失败。可靠性最高,适用于订单、支付等核心消息场景。
  2. 异步发送:Producer 发送消息时注册回调函数,非阻塞等待 Broker 的返回结果,发送线程不会阻塞。适用于吞吐量要求高、需要处理发送结果的场景。
  3. 单向发送:Producer 发送消息后,不等待 Broker 的返回结果,也不注册回调,发送后立即返回。性能最高,可靠性最低,适用于日志收集等非核心消息场景。

2.2.2 消息发送全流程

消息发送的完整流程如下,内置负载均衡、故障重试机制,保障消息发送的可靠性:

2.2.3 事务消息原理

RocketMQ 原生支持事务消息,基于两阶段提交+事务回查机制,实现本地事务与消息发送的原子性,彻底解决分布式事务问题。

事务消息完整流程:

  1. 半消息发送:Producer 发送半消息(Half Message)到 Broker,Broker 将消息存储到系统内置的半消息主题中,此时消息对消费者不可见,返回发送成功给 Producer。
  2. 本地事务执行:Producer 收到半消息发送成功的响应后,执行本地事务逻辑。
  3. 二次确认提交:Producer 根据本地事务执行结果,向 Broker 发送二次确认指令:
  • 事务执行成功:发送 Commit 指令,Broker 将半消息从半消息主题转移到真实业务主题,对消费者可见。
  • 事务执行失败:发送 Rollback 指令,Broker 直接删除半消息,不会投递给消费者。
  1. 事务状态回查:若发生网络异常,Broker 未收到二次确认指令,Broker 会定时回查 Producer 的本地事务状态(默认间隔1分钟,最多回查15次),若超过回查次数仍无法确认,Broker 将自动 Rollback 消息。

2.3 消息消费核心原理

2.3.1 两种消费模式

RocketMQ 提供两种消费模式,适配不同的业务场景:

  1. 集群消费:同一个消费组(Consumer Group)的多个消费者,共同消费 Topic 的队列,一个队列只会被同一个消费组的一个消费者消费,实现负载均衡,消息只会被消费一次。这是生产环境最常用的消费模式。
  2. 广播消费:同一个消费组的所有消费者,都会消费 Topic 的所有队列,每条消息都会被消费组内的每一个消费者消费一次。适用于配置刷新、全局通知等场景。

2.3.2 推模式底层实现

RocketMQ 的推模式(Push Consumer)本质上是对拉模式的封装,底层采用长轮询机制实现:

  • Consumer 主动向 Broker 发起拉取请求,若 Broker 有对应消息,立即返回消息给 Consumer。
  • 若 Broker 没有对应消息,不会立即返回空结果,而是 hold 住该请求,直到有新消息到达或长轮询超时(默认15秒),再返回响应给 Consumer。
  • 长轮询机制既保证了消息消费的实时性,又避免了 Consumer 频繁轮询带来的性能损耗。

2.3.3 消费重试与死信队列

RocketMQ 提供完善的消费重试与死信机制,保障消息不会因消费失败而丢失:

  1. 消费重试:集群消费模式下,消息消费失败后,Consumer 抛出异常会触发重试,消息会被发送到重试主题(%RETRY%+消费组名),每个消费组有独立的重试队列。重试次数默认16次,每次重试的间隔时间递增,从1秒到2小时不等。
  2. 死信队列:当消息重试次数超过最大值(默认16次)后,消息会被投递到死信主题(%DLQ%+消费组名),死信队列中的消息不会再被自动消费,需要人工干预处理,避免消息无限重试占用系统资源。

2.4 负载均衡与重平衡原理

2.4.1 Producer 端负载均衡

Producer 从 NameServer 获取 Topic 的路由信息后,默认采用轮询策略将消息发送到不同的队列,实现 Broker 集群的负载均衡。同时支持自定义队列选择策略,例如根据 Message Key 哈希取模,保证同一个 Key 的消息发送到同一个队列,实现严格的顺序消息。

2.4.2 Consumer 端负载均衡与重平衡

集群消费模式下,同一个消费组的消费者会对 Topic 的队列进行分配,核心规则是一个队列只能被同一个消费组的一个消费者消费,一个消费者可以消费多个队列。

RocketMQ 提供多种队列分配策略,默认采用平均分配策略,同时支持环形分配、一致性哈希、机房就近分配等策略。

当消费组内的消费者数量发生变化(上下线)、Topic 的队列数量发生变化时,会触发重平衡(Rebalance),重新分配队列,保证消费的负载均衡。重平衡完整流程如下:

三、RocketMQ 生产级集群部署实战

3.1 集群模式选型

生产环境需根据业务的可靠性、性能、可用性要求,选择合适的集群模式,各模式对比与适用场景如下:

集群模式 核心优势 核心劣势 适用场景
多Master多Slave(异步复制) 性能极高,吞吐量优秀,无单点故障,Master宕机后可从Slave继续消费 Master宕机时会丢失少量未同步的消息 绝大多数互联网核心业务,对性能要求高,允许极少量消息丢失
多Master多Slave(同步复制) 数据零丢失,金融级可靠性,无单点故障,Master宕机后数据完整 发送延迟更高,吞吐量有一定损耗 金融、支付、交易等对数据可靠性要求极高的场景
Dledger Raft模式 自动主从切换,真正的高可用,数据强一致,无需人工干预故障转移 部署复杂度略高,性能比异步复制略有损耗 生产核心业务,要求故障自动转移、服务零中断的场景

本文以3节点Dledger Raft集群为例,讲解生产级集群的完整部署流程,基于Apache RocketMQ 5.4.0,适配 JDK 17,实现自动主从切换、数据强一致、高可用。

3.2 环境准备

3.2.1 机器规划

机器IP 部署角色 核心端口
192.168.1.101 NameServer、Broker 9876、10911、10909、40911
192.168.1.102 NameServer、Broker 9876、10911、10909、40911
192.168.1.103 NameServer、Broker 9876、10911、10909、40911

3.2.2 软件与硬件要求

  • 操作系统:CentOS 7.9+/Ubuntu 22.04+,生产环境优先选用稳定版 Linux 系统。
  • JDK:JDK 17 ,RocketMQ 5.4.0 完全适配 JDK 17。
  • RocketMQ:Apache RocketMQ 5.4.0 。
  • 硬件要求:CPU 8核16线程以上,内存32G以上,SSD固态硬盘1TB以上,万兆网卡。

3.2.3 操作系统优化

# 1. 关闭防火墙和SELinux
systemctl stop firewalld && systemctl disable firewalld
setenforce 0 && sed -i 's/^SELINUX=.*/SELINUX=disabled/' /etc/selinux/config

# 2. 优化文件句柄数
echo "* soft nofile 655350" >> /etc/security/limits.conf
echo "* hard nofile 655350" >> /etc/security/limits.conf
echo "* soft nproc 655350" >> /etc/security/limits.conf
echo "* hard nproc 655350" >> /etc/security/limits.conf

# 3. 虚拟内存优化,关闭swap分区
echo "vm.swappiness=0" >> /etc/sysctl.conf
sysctl -p

# 4. 关闭透明大页,避免内存碎片化
echo never > /sys/kernel/mm/transparent_hugepage/enabled
echo never > /sys/kernel/mm/transparent_hugepage/defrag

3.3 集群部署步骤

步骤1:JDK 17 安装与配置(所有机器执行)

# 下载JDK 17最新稳定版
wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
# 解压到指定目录
tar -zxvf jdk-17_linux-x64_bin.tar.gz -C /usr/local/
# 配置环境变量
cat >> /etc/profile << EOF
export JAVA_HOME=/usr/local/jdk-17.0.12
export PATH=\$JAVA_HOME/bin:\$PATH
export CLASSPATH=.:\$JAVA_HOME/lib/dt.jar:\$JAVA_HOME/lib/tools.jar
EOF
# 生效环境变量
source /etc/profile
# 验证安装
java -version

步骤2:RocketMQ 安装与环境配置(所有机器执行)

# 下载Apache RocketMQ 5.4.0最新稳定版
wget https://archive.apache.org/dist/rocketmq/5.4.0/rocketmq-all-5.4.0-bin-release.zip
# 解压
unzip rocketmq-all-5.4.0-bin-release.zip -d /usr/local/
# 重命名
mv /usr/local/rocketmq-all-5.4.0-bin-release /usr/local/rocketmq
# 配置环境变量
cat >> /etc/profile << EOF
export ROCKETMQ_HOME=/usr/local/rocketmq
export PATH=\$ROCKETMQ_HOME/bin:\$PATH
EOF
# 生效环境变量
source /etc/profile
# 创建消息存储目录
mkdir -p /data/rocketmq/store
chmod -R 777 /data/rocketmq/store

步骤3:JVM 参数优化(适配JDK 17,所有机器执行)

修改 NameServer JVM 配置文件 bin/runserver.sh

vi /usr/local/rocketmq/bin/runserver.sh
# 修改核心JVM参数
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
# JDK 17 反射权限适配参数
JAVA_OPT="${JAVA_OPT} --add-opens=java.base/java.lang=ALL-UNNAMED"
JAVA_OPT="${JAVA_OPT} --add-opens=java.base/java.math=ALL-UNNAMED"
JAVA_OPT="${JAVA_OPT} --add-opens=java.base/java.io=ALL-UNNAMED"
JAVA_OPT="${JAVA_OPT} --add-opens=java.base/java.nio=ALL-UNNAMED"
JAVA_OPT="${JAVA_OPT} --add-opens=java.base/java.net=ALL-UNNAMED"
JAVA_OPT="${JAVA_OPT} --add-opens=java.base/java.util=ALL-UNNAMED"
JAVA_OPT="${JAVA_OPT} --add-opens=java.base/java.util.concurrent=ALL-UNNAMED"
JAVA_OPT="${JAVA_OPT} --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED"
JAVA_OPT="${JAVA_OPT} --add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
JAVA_OPT="${JAVA_OPT} --add-opens=java.base/sun.nio.cs=ALL-UNNAMED"
JAVA_OPT="${JAVA_OPT} --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED"

修改 Broker JVM 配置文件 bin/runbroker.sh

vi /usr/local/rocketmq/bin/runbroker.sh
# 修改核心JVM参数
JAVA_OPT="${JAVA_OPT} -server -Xms16g -Xmx16g -Xmn8g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=16g"
# JDK 17 反射权限适配参数(同runserver.sh,添加相同的--add-opens参数)

步骤4:Broker 配置文件编写(Dledger模式)

192.168.1.101 机器配置文件 conf/dledger/broker-n0.conf

brokerClusterName=RocketMQ-Dledger-Cluster

brokerName=RaftNode00

brokerId=-1

namesrvAddr=192.168.1.101:9876;192.168.1.102:9876;192.168.1.103:9876

listenPort=10911

storePathRootDir=/data/rocketmq/store

storePathCommitLog=/data/rocketmq/store/commitlog

storePathConsumeQueue=/data/rocketmq/store/consumequeue

storePathIndex=/data/rocketmq/store/index

flushDiskType=ASYNC_FLUSH

sendMessageThreadPoolNums=32

pullMessageThreadPoolNums=32

enableDLegerCommitLog=true

dLegerGroup=RaftNode00

dLegerPeers=n0-192.168.1.101:40911;n1-192.168.1.102:40911;n2-192.168.1.103:40911

dLegerSelfId=n0

enableTrace=true

autoCreateTopicEnable=false

autoCreateSubscriptionGroup=false

192.168.1.102 机器配置文件 conf/dledger/broker-n1.conf,仅修改 dLegerSelfId=n1,其余配置完全一致。 192.168.1.103 机器配置文件 conf/dledger/broker-n2.conf,仅修改 dLegerSelfId=n2,其余配置完全一致。

步骤5:启动 NameServer 集群(所有机器执行)

# 后台启动NameServer
nohup sh /usr/local/rocketmq/bin/mqnamesrv > /usr/local/rocketmq/logs/namesrv.log 2>&1 &
# 查看启动日志,出现"The Name Server boot success"表示启动成功
tail -f /usr/local/rocketmq/logs/namesrv.log

步骤6:启动 Broker 集群(对应机器执行)

192.168.1.101 机器:

nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/dledger/broker-n0.conf > /usr/local/rocketmq/logs/broker.log 2>&1 &

192.168.1.102 机器:

nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/dledger/broker-n1.conf > /usr/local/rocketmq/logs/broker.log 2>&1 &

192.168.1.103 机器:

nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/dledger/broker-n2.conf > /usr/local/rocketmq/logs/broker.log 2>&1 &

查看启动日志,出现The broker boot success表示启动成功。

步骤7:集群状态验证

# 查看集群节点状态
sh /usr/local/rocketmq/bin/mqadmin clusterList -n 192.168.1.101:9876

执行后可看到3个Broker节点,其中1个为Master,另外2个为Slave,集群部署成功。

步骤8:RocketMQ Dashboard 可视化控制台部署

# 下载RocketMQ Dashboard 2.0.0最新稳定版
wget https://archive.apache.org/dist/rocketmq/rocketmq-dashboard/2.0.0/rocketmq-dashboard-2.0.0.jar
# 后台启动Dashboard
nohup java -jar rocketmq-dashboard-2.0.0.jar --rocketmq.config.namesrvAddr=192.168.1.101:9876;192.168.1.102:9876;192.168.1.103:9876 --server.port=8080 > dashboard.log 2>&1 &

启动后访问 http://机器IP:8080 即可进入可视化控制台,管理集群、Topic、消息、消费者等资源。

四、Spring Boot 3.x 整合 RocketMQ 实战

本实战基于 JDK 17、Spring Boot 3.3.5、RocketMQ 5.4.0,包含完整的消息发送、消费、幂等处理、业务落地全流程。

4.1 项目依赖配置 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

   <modelVersion>4.0.0</modelVersion>
   <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>3.3.5</version>
       <relativePath/>
   </parent>
   <groupId>com.jam.demo</groupId>
   <artifactId>rocketmq-demo</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>rocketmq-demo</name>
   <description>RocketMQ Spring Boot Demo</description>
   <properties>
       <java.version>17</java.version>
       <rocketmq.version>2.5.0</rocketmq.version>
       <mybatis-plus.version>3.5.7</mybatis-plus.version>
       <fastjson2.version>2.0.53</fastjson2.version>
       <guava.version>33.2.0-jre</guava.version>
       <lombok.version>1.18.34</lombok.version>
       <springdoc.version>2.6.0</springdoc.version>
   </properties>
   <dependencies>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>
       <dependency>
           <groupId>org.apache.rocketmq</groupId>
           <artifactId>rocketmq-spring-boot-starter</artifactId>
           <version>${rocketmq.version}</version>
       </dependency>
       <dependency>
           <groupId>com.baomidou</groupId>
           <artifactId>mybatis-plus-boot-starter</artifactId>
           <version>${mybatis-plus.version}</version>
       </dependency>
       <dependency>
           <groupId>com.mysql</groupId>
           <artifactId>mysql-connector-j</artifactId>
           <scope>runtime</scope>
       </dependency>
       <dependency>
           <groupId>com.alibaba.fastjson2</groupId>
           <artifactId>fastjson2</artifactId>
           <version>${fastjson2.version}</version>
       </dependency>
       <dependency>
           <groupId>com.google.guava</groupId>
           <artifactId>guava</artifactId>
           <version>${guava.version}</version>
       </dependency>
       <dependency>
           <groupId>org.projectlombok</groupId>
           <artifactId>lombok</artifactId>
           <version>${lombok.version}</version>
           <scope>provided</scope>
       </dependency>
       <dependency>
           <groupId>org.springdoc</groupId>
           <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
           <version>${springdoc.version}</version>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-test</artifactId>
           <scope>test</scope>
       </dependency>
   </dependencies>
   <build>
       <plugins>
           <plugin>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-maven-plugin</artifactId>
               <configuration>
                   <excludes>
                       <exclude>
                           <groupId>org.projectlombok</groupId>
                           <artifactId>lombok</artifactId>
                       </exclude>
                   </excludes>
               </configuration>
           </plugin>
       </plugins>
   </build>
</project>

4.2 应用配置文件 application.yml

server:
 port: 8081
spring:
 application:
   name: rocketmq-demo
 datasource:
   url: jdbc:mysql://127.0.0.1:3306/rocketmq_demo?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=true
   username: root
   password: root
   driver-class-name: com.mysql.cj.jdbc.Driver
rocketmq:
 name-server: 192.168.1.101:9876;192.168.1.102:9876;192.168.1.103:9876
 producer:
   group: demo-producer-group
   send-message-timeout: 3000
   retry-times-when-send-failed: 2
   retry-times-when-send-async-failed: 2
mybatis-plus:
 mapper-locations: classpath*:/mapper/**/*.xml
 configuration:
   map-underscore-to-camel-case: true
 global-config:
   db-config:
     id-type: auto
springdoc:
 api-docs:
   enabled: true
   path: /v3/api-docs
 swagger-ui:
   enabled: true
   path: /swagger-ui.html

4.3 MySQL 数据库表结构(MySQL 8.0 可直接执行)

CREATE DATABASE IF NOT EXISTS rocketmq_demo DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE rocketmq_demo;

-- 消息消费幂等表
DROP TABLE IF EXISTS t_message_idempotent;
CREATE TABLE t_message_idempotent (
   id BIGINT NOT NULL AUTO_INCREMENT COMMENT '主键ID',
   message_id VARCHAR(64) NOT NULL COMMENT '消息唯一ID',
   topic VARCHAR(128) NOT NULL COMMENT '消息主题',
   consumer_group VARCHAR(128) NOT NULL COMMENT '消费组',
   consume_status TINYINT NOT NULL DEFAULT 0 COMMENT '消费状态 0-未消费 1-已消费',
   create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
   update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
   PRIMARY KEY (id),
   UNIQUE KEY uk_msg_id (message_id, topic, consumer_group) COMMENT '消息唯一索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='消息消费幂等表';

-- 订单业务表
DROP TABLE IF EXISTS t_order;
CREATE TABLE t_order (
   id BIGINT NOT NULL AUTO_INCREMENT COMMENT '订单ID',
   order_no VARCHAR(32) NOT NULL COMMENT '订单编号',
   user_id BIGINT NOT NULL COMMENT '用户ID',
   amount DECIMAL(10,2) NOT NULL COMMENT '订单金额',
   order_status TINYINT NOT NULL DEFAULT 0 COMMENT '订单状态 0-待支付 1-已支付 2-已取消',
   create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
   update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
   PRIMARY KEY (id),
   UNIQUE KEY uk_order_no (order_no) COMMENT '订单编号唯一索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='订单表';

4.4 核心实体类

4.4.1 消息幂等实体类

package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 消息消费幂等实体类
* @author ken
* @date 2026-03-03
*/

@Data
@TableName("t_message_idempotent")
@Schema(description = "消息消费幂等实体")
public class MessageIdempotent {
   @TableId(type = IdType.AUTO)
   @Schema(description = "主键ID")
   private Long id;
   @Schema(description = "消息唯一ID")
   private String messageId;
   @Schema(description = "消息主题")
   private String topic;
   @Schema(description = "消费组")
   private String consumerGroup;
   @Schema(description = "消费状态 0-未消费 1-已消费")
   private Integer consumeStatus;
   @Schema(description = "创建时间")
   private LocalDateTime createTime;
   @Schema(description = "更新时间")
   private LocalDateTime updateTime;
}

4.4.2 订单实体类

package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;
/**
* 订单实体类
* @author ken
* @date 2026-03-03
*/

@Data
@TableName("t_order")
@Schema(description = "订单实体")
public class Order {
   @TableId(type = IdType.AUTO)
   @Schema(description = "订单ID")
   private Long id;
   @Schema(description = "订单编号")
   private String orderNo;
   @Schema(description = "用户ID")
   private Long userId;
   @Schema(description = "订单金额")
   private BigDecimal amount;
   @Schema(description = "订单状态 0-待支付 1-已支付 2-已取消")
   private Integer orderStatus;
   @Schema(description = "创建时间")
   private LocalDateTime createTime;
   @Schema(description = "更新时间")
   private LocalDateTime updateTime;
}

4.4.3 订单消息DTO

package com.jam.demo.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.io.Serializable;
import java.math.BigDecimal;
/**
* 订单消息DTO
* @author ken
* @date 2026-03-03
*/

@Data
@Schema(description = "订单消息DTO")
public class OrderMessageDTO implements Serializable {
   private static final long serialVersionUID = 1L;
   @Schema(description = "订单编号")
   private String orderNo;
   @Schema(description = "用户ID")
   private Long userId;
   @Schema(description = "订单金额")
   private BigDecimal amount;
   @Schema(description = "订单状态")
   private Integer orderStatus;
}

4.5 Mapper 数据访问层

4.5.1 消息幂等Mapper

package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.MessageIdempotent;
import org.apache.ibatis.annotations.Mapper;
/**
* 消息幂等Mapper
* @author ken
* @date 2026-03-03
*/

@Mapper
public interface MessageIdempotentMapper extends BaseMapper<MessageIdempotent> {
}

4.5.2 订单Mapper

package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.Order;
import org.apache.ibatis.annotations.Mapper;
/**
* 订单Mapper
* @author ken
* @date 2026-03-03
*/

@Mapper
public interface OrderMapper extends BaseMapper<Order> {
}

4.6 Service 业务逻辑层

4.6.1 消息幂等服务接口

package com.jam.demo.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.jam.demo.entity.MessageIdempotent;
/**
* 消息幂等服务接口
* @author ken
* @date 2026-03-03
*/

public interface MessageIdempotentService extends IService<MessageIdempotent> {
   /**
    * 检查消息是否已经消费
    * @param messageId 消息唯一ID
    * @param topic 消息主题
    * @param consumerGroup 消费组
    * @return true-已消费 false-未消费
    */

   boolean checkMessageConsumed(String messageId, String topic, String consumerGroup);
   /**
    * 标记消息为已消费
    * @param messageId 消息唯一ID
    * @param topic 消息主题
    * @param consumerGroup 消费组
    * @return 操作结果
    */

   boolean markMessageConsumed(String messageId, String topic, String consumerGroup);
}

4.6.2 消息幂等服务实现类

package com.jam.demo.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.jam.demo.entity.MessageIdempotent;
import com.jam.demo.mapper.MessageIdempotentMapper;
import com.jam.demo.service.MessageIdempotentService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.ObjectUtils;
/**
* 消息幂等服务实现类
* @author ken
* @date 2026-03-03
*/

@Slf4j
@Service
@RequiredArgsConstructor
public class MessageIdempotentServiceImpl extends ServiceImpl<MessageIdempotentMapper, MessageIdempotent> implements MessageIdempotentService {
   private final PlatformTransactionManager transactionManager;
   @Override
   public boolean checkMessageConsumed(String messageId, String topic, String consumerGroup) {
       if (!org.springframework.util.StringUtils.hasText(messageId) || !org.springframework.util.StringUtils.hasText(topic) || !org.springframework.util.StringUtils.hasText(consumerGroup)) {
           log.error("消息幂等校验参数异常,messageId:{}, topic:{}, consumerGroup:{}", messageId, topic, consumerGroup);
           return false;
       }
       LambdaQueryWrapper<MessageIdempotent> queryWrapper = new LambdaQueryWrapper<>();
       queryWrapper.eq(MessageIdempotent::getMessageId, messageId)
               .eq(MessageIdempotent::getTopic, topic)
               .eq(MessageIdempotent::getConsumerGroup, consumerGroup)
               .eq(MessageIdempotent::getConsumeStatus, 1);
       MessageIdempotent messageIdempotent = this.getOne(queryWrapper);
       return !ObjectUtils.isEmpty(messageIdempotent);
   }
   @Override
   public boolean markMessageConsumed(String messageId, String topic, String consumerGroup) {
       if (!org.springframework.util.StringUtils.hasText(messageId) || !org.springframework.util.StringUtils.hasText(topic) || !org.springframework.util.StringUtils.hasText(consumerGroup)) {
           log.error("标记消息消费状态参数异常,messageId:{}, topic:{}, consumerGroup:{}", messageId, topic, consumerGroup);
           return false;
       }
       DefaultTransactionDefinition def = new DefaultTransactionDefinition();
       def.setPropagationBehavior(DefaultTransactionDefinition.PROPAGATION_REQUIRED);
       TransactionStatus status = transactionManager.getTransaction(def);
       try {
           MessageIdempotent messageIdempotent = new MessageIdempotent();
           messageIdempotent.setMessageId(messageId);
           messageIdempotent.setTopic(topic);
           messageIdempotent.setConsumerGroup(consumerGroup);
           messageIdempotent.setConsumeStatus(1);
           boolean saveResult = this.save(messageIdempotent);
           transactionManager.commit(status);
           return saveResult;
       } catch (Exception e) {
           transactionManager.rollback(status);
           log.error("标记消息消费状态异常,messageId:{}, topic:{}, consumerGroup:{}", messageId, topic, consumerGroup, e);
           return false;
       }
   }
}

4.6.3 订单服务接口

package com.jam.demo.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.jam.demo.dto.OrderMessageDTO;
import com.jam.demo.entity.Order;
/**
* 订单服务接口
* @author ken
* @date 2026-03-03
*/

public interface OrderService extends IService<Order> {
   /**
    * 创建订单
    * @param orderMessageDTO 订单消息DTO
    * @return 操作结果
    */

   boolean createOrder(OrderMessageDTO orderMessageDTO);
}

4.6.4 订单服务实现类

package com.jam.demo.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.jam.demo.dto.OrderMessageDTO;
import com.jam.demo.entity.Order;
import com.jam.demo.mapper.OrderMapper;
import com.jam.demo.service.OrderService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.StringUtils;
/**
* 订单服务实现类
* @author ken
* @date 2026-03-03
*/

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {
   private final PlatformTransactionManager transactionManager;
   @Override
   public boolean createOrder(OrderMessageDTO orderMessageDTO) {
       if (!StringUtils.hasText(orderMessageDTO.getOrderNo()) || orderMessageDTO.getUserId() == null || orderMessageDTO.getAmount() == null) {
           log.error("创建订单参数异常,orderMessageDTO:{}", orderMessageDTO);
           return false;
       }
       DefaultTransactionDefinition def = new DefaultTransactionDefinition();
       def.setPropagationBehavior(DefaultTransactionDefinition.PROPAGATION_REQUIRED);
       TransactionStatus status = transactionManager.getTransaction(def);
       try {
           Order order = new Order();
           order.setOrderNo(orderMessageDTO.getOrderNo());
           order.setUserId(orderMessageDTO.getUserId());
           order.setAmount(orderMessageDTO.getAmount());
           order.setOrderStatus(orderMessageDTO.getOrderStatus());
           boolean saveResult = this.save(order);
           transactionManager.commit(status);
           log.info("创建订单成功,orderNo:{}", orderMessageDTO.getOrderNo());
           return saveResult;
       } catch (Exception e) {
           transactionManager.rollback(status);
           log.error("创建订单异常,orderNo:{}", orderMessageDTO.getOrderNo(), e);
           return false;
       }
   }
}

4.7 消息生产者服务

package com.jam.demo.producer;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.dto.OrderMessageDTO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
/**
* RocketMQ消息生产者服务
* @author ken
* @date 2026-03-03
*/

@Slf4j
@Service
@RequiredArgsConstructor
public class RocketMQProducerService {
   private final RocketMQTemplate rocketMQTemplate;
   /**
    * 同步发送订单消息
    * @param topic 消息主题
    * @param orderMessageDTO 订单消息DTO
    * @return 发送结果
    */

   public boolean syncSendOrderMessage(String topic, OrderMessageDTO orderMessageDTO) {
       if (!StringUtils.hasText(topic) || orderMessageDTO == null) {
           log.error("同步发送消息参数异常,topic:{}, orderMessageDTO:{}", topic, orderMessageDTO);
           return false;
       }
       try {
           rocketMQTemplate.syncSend(topic, JSON.toJSONString(orderMessageDTO));
           log.info("同步发送订单消息成功,topic:{}, orderNo:{}", topic, orderMessageDTO.getOrderNo());
           return true;
       } catch (Exception e) {
           log.error("同步发送订单消息异常,topic:{}, orderNo:{}", topic, orderMessageDTO.getOrderNo(), e);
           return false;
       }
   }
   /**
    * 异步发送订单消息
    * @param topic 消息主题
    * @param orderMessageDTO 订单消息DTO
    */

   public void asyncSendOrderMessage(String topic, OrderMessageDTO orderMessageDTO) {
       if (!StringUtils.hasText(topic) || orderMessageDTO == null) {
           log.error("异步发送消息参数异常,topic:{}, orderMessageDTO:{}", topic, orderMessageDTO);
           return;
       }
       try {
           rocketMQTemplate.asyncSend(topic, JSON.toJSONString(orderMessageDTO), new org.apache.rocketmq.spring.core.RocketMQLocalRequestCallback() {
               @Override
               public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
                   log.info("异步发送订单消息成功,topic:{}, orderNo:{}, msgId:{}", topic, orderMessageDTO.getOrderNo(), sendResult.getMsgId());
               }
               @Override
               public void onException(Throwable e) {
                   log.error("异步发送订单消息异常,topic:{}, orderNo:{}", topic, orderMessageDTO.getOrderNo(), e);
               }
           });
       } catch (Exception e) {
           log.error("异步发送订单消息异常,topic:{}, orderNo:{}", topic, orderMessageDTO.getOrderNo(), e);
       }
   }
   /**
    * 单向发送订单消息
    * @param topic 消息主题
    * @param orderMessageDTO 订单消息DTO
    */

   public void oneWaySendOrderMessage(String topic, OrderMessageDTO orderMessageDTO) {
       if (!StringUtils.hasText(topic) || orderMessageDTO == null) {
           log.error("单向发送消息参数异常,topic:{}, orderMessageDTO:{}", topic, orderMessageDTO);
           return;
       }
       try {
           rocketMQTemplate.sendOneWay(topic, JSON.toJSONString(orderMessageDTO));
           log.info("单向发送订单消息完成,topic:{}, orderNo:{}", topic, orderMessageDTO.getOrderNo());
       } catch (Exception e) {
           log.error("单向发送订单消息异常,topic:{}, orderNo:{}", topic, orderMessageDTO.getOrderNo(), e);
       }
   }
}

4.8 消息消费者服务(含幂等处理)

package com.jam.demo.consumer;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.dto.OrderMessageDTO;
import com.jam.demo.service.MessageIdempotentService;
import com.jam.demo.service.OrderService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
/**
* 订单消息消费者
* @author ken
* @date 2026-03-03
*/

@Slf4j
@Service
@RequiredArgsConstructor
@RocketMQMessageListener(
       topic = "order_topic",
       consumerGroup = "order_consumer_group",
       selectorExpression = "*"
)
public class OrderMessageConsumer implements RocketMQListener<String> {
   private final MessageIdempotentService messageIdempotentService;
   private final OrderService orderService;
   private static final String TOPIC = "order_topic";
   private static final String CONSUMER_GROUP = "order_consumer_group";
   @Override
   public void onMessage(String message) {
       if (!StringUtils.hasText(message)) {
           log.warn("收到空消息,直接跳过");
           return;
       }
       // 获取消息上下文与messageId
       org.apache.rocketmq.common.message.MessageExt messageExt = org.apache.rocketmq.spring.support.RocketMQUtil.getMessageExt();
       String messageId = messageExt.getMsgId();
       log.info("收到订单消息,messageId:{}, message:{}", messageId, message);
       // 幂等校验
       boolean isConsumed = messageIdempotentService.checkMessageConsumed(messageId, TOPIC, CONSUMER_GROUP);
       if (isConsumed) {
           log.warn("消息已消费,直接跳过,messageId:{}", messageId);
           return;
       }
       // 解析消息
       OrderMessageDTO orderMessageDTO;
       try {
           orderMessageDTO = JSON.parseObject(message, OrderMessageDTO.class);
       } catch (Exception e) {
           log.error("消息解析异常,messageId:{}, message:{}", messageId, message, e);
           throw new RuntimeException("消息解析异常", e);
       }
       // 业务处理
       boolean createResult = orderService.createOrder(orderMessageDTO);
       if (!createResult) {
           log.error("订单创建失败,消息将重试,messageId:{}, orderNo:{}", messageId, orderMessageDTO.getOrderNo());
           throw new RuntimeException("订单创建失败,触发重试");
       }
       // 标记消息为已消费
       boolean markResult = messageIdempotentService.markMessageConsumed(messageId, TOPIC, CONSUMER_GROUP);
       if (!markResult) {
           log.error("标记消息消费状态失败,messageId:{}", messageId);
           throw new RuntimeException("标记消息消费状态失败,触发重试");
       }
       log.info("订单消息消费完成,messageId:{}, orderNo:{}", messageId, orderMessageDTO.getOrderNo());
   }
}

4.9 接口控制器

package com.jam.demo.controller;
import com.jam.demo.dto.OrderMessageDTO;
import com.jam.demo.producer.RocketMQProducerService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
/**
* 订单消息控制器
* @author ken
* @date 2026-03-03
*/

@Slf4j
@RestController
@RequestMapping("/api/order")
@RequiredArgsConstructor
@Tag(name = "订单消息接口", description = "RocketMQ订单消息发送接口")
public class OrderMessageController {
   private final RocketMQProducerService rocketMQProducerService;
   private static final String ORDER_TOPIC = "order_topic";
   @PostMapping("/sync/send")
   @Operation(summary = "同步发送订单消息", description = "同步发送订单消息到RocketMQ,等待发送结果")
   public ResponseEntity<Boolean> syncSendOrderMessage(@RequestBody OrderMessageDTO orderMessageDTO) {
       boolean result = rocketMQProducerService.syncSendOrderMessage(ORDER_TOPIC, orderMessageDTO);
       return ResponseEntity.ok(result);
   }
   @PostMapping("/async/send")
   @Operation(summary = "异步发送订单消息", description = "异步发送订单消息到RocketMQ,不阻塞主线程")
   public ResponseEntity<String> asyncSendOrderMessage(@RequestBody OrderMessageDTO orderMessageDTO) {
       rocketMQProducerService.asyncSendOrderMessage(ORDER_TOPIC, orderMessageDTO);
       return ResponseEntity.ok("异步消息发送请求已提交");
   }
   @PostMapping("/oneway/send")
   @Operation(summary = "单向发送订单消息", description = "单向发送订单消息到RocketMQ,不等待返回结果")
   public ResponseEntity<String> oneWaySendOrderMessage(@RequestBody OrderMessageDTO orderMessageDTO) {
       rocketMQProducerService.oneWaySendOrderMessage(ORDER_TOPIC, orderMessageDTO);
       return ResponseEntity.ok("单向消息发送完成");
   }
}

4.10 项目启动类

package com.jam.demo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;
/**
* RocketMQ Demo启动类
* @author ken
* @date 2026-03-03
*/

@SpringBootApplication
@MapperScan("com.jam.demo.mapper")
@OpenAPIDefinition(
       info = @Info(
               title = "RocketMQ Demo API",
               version = "1.0",
               description = "RocketMQ Spring Boot 整合示例API文档"
       )
)
public class RocketmqDemoApplication {
   public static void main(String[] args) {
       SpringApplication.run(RocketmqDemoApplication.class, args);
   }
}

五、生产环境最佳实践与常见问题排查

5.1 消息可靠性保障最佳实践

生产端

  1. 核心消息优先使用同步发送,避免使用单向发送,确保消息发送结果可感知。
  2. 开启合理的发送重试,设置重试次数为2-3次,配合故障规避机制,自动切换到健康的Broker节点。
  3. 发送失败的消息需做好兜底处理,存入本地数据库,定时任务重试,避免消息丢失。
  4. 分布式事务场景必须使用RocketMQ原生事务消息机制,保证本地事务与消息发送的原子性。

Broker端

  1. 金融级场景使用同步复制+同步刷盘,普通核心场景使用Dledger模式+异步刷盘,平衡可靠性与性能。
  2. 生产环境关闭自动创建Topic和订阅组,提前规划并创建Topic,避免权限失控。
  3. 合理设置消息保留时间,默认3天,根据业务需求调整,避免磁盘空间耗尽。
  4. 开启消息轨迹,方便消息全链路追踪与问题排查。

消费端

  1. 消费失败必须抛出异常,触发RocketMQ的重试机制,禁止捕获异常后不处理导致消息丢失。
  2. 必须实现消费幂等,RocketMQ保证消息至少投递一次,重复投递是常态,幂等是消费端的必备能力。
  3. 死信队列必须配置监控告警,及时处理消费失败的死信消息,避免业务异常。
  4. 消费逻辑避免耗时操作,优化数据库、远程调用等逻辑,提升消费速度,避免消息堆积。

5.2 性能优化最佳实践

  1. 批量发送与消费:非实时性消息使用批量发送,减少网络IO次数,提升吞吐量;消费端开启批量拉取,减少RPC调用次数。
  2. 合理设置队列数量:单个Topic的队列数量建议与Broker的CPU核心数相当,过多的队列会导致ConsumeQueue文件过多,影响IO性能。
  3. JVM优化:合理设置堆内存与堆外内存,避免频繁GC;新生代大小设置为堆内存的50%,提升短生命周期对象的回收效率。
  4. 磁盘优化:使用SSD固态硬盘,CommitLog、ConsumeQueue、IndexFile分别挂载到不同的磁盘,避免IO竞争。
  5. 消息压缩:大于4KB的消息开启压缩,减少网络传输量,提升发送与消费性能。

5.3 常见问题排查

1. 消息堆积

  • 排查思路:先确认消费端是否正常运行,是否有报错;再查看消费逻辑的耗时,是否消费速度跟不上生产速度;最后查看Topic的队列数量与消费者数量是否匹配。
  • 解决方案:优化消费逻辑,减少耗时操作;增加消费者数量,提升消费能力;临时增加Topic的队列数量,提升并行消费能力;对于历史堆积消息,可通过重置消费位点跳过,单独处理。

2. 消息丢失

  • 排查思路:从生产端、Broker端、消费端全链路排查,生产端是否发送成功,Broker端是否配置了异步刷盘+异步复制,消费端是否捕获了异常未抛出导致消息被标记为已消费。
  • 解决方案:生产端使用同步发送,确认发送结果;Broker端根据业务需求配置合理的刷盘与复制策略;消费端消费失败必须抛出异常,禁止静默处理。

3. 消息重复消费

  • 排查思路:RocketMQ的重试机制、重平衡、消费者重启都会导致消息重复投递,核心原因是消费端未实现幂等处理。
  • 解决方案:基于消息唯一ID实现数据库唯一索引幂等,或者基于业务唯一键实现幂等,确保同一条消息多次消费结果一致。

六、总结

RocketMQ 作为国内最主流的分布式消息中间件,其优秀的架构设计、金融级的可靠性、超高的性能,完全能够满足绝大多数企业级业务场景的需求。本文从核心底层原理,到生产级集群部署,再到业务实战,全面讲解了RocketMQ的核心知识,所有内容均基于官方权威文档与生产实践,确保准确无误。

掌握RocketMQ的核心,本质上是理解其“高性能存储设计”、“可靠性保障机制”、“分布式高可用架构”三大核心,只有吃透底层逻辑,才能在生产环境中用好RocketMQ,避免踩坑,充分发挥其性能与可靠性优势。

目录
相关文章
|
7天前
|
人工智能 自然语言处理 JavaScript
2026年Windows+Ollama本地部署OpenClaw保姆级教程:本地AI Agent+阿里云上快速搭建
2026年OpenClaw凭借本地部署、私有化运行的特性,成为打造个人智能体的核心工具,而Ollama作为轻量级本地大模型管理工具,能让OpenClaw摆脱对云端大模型的依赖,实现**本地推理、数据不泄露、全流程私有化**的智能体验。本文基于Windows 11系统,从硬件环境准备、Ollama安装与模型定制、OpenClaw部署配置、技能扩展到常见问题排查,打造保姆级本地部署教程,同时补充阿里云OpenClaw(Clawdbot)快速部署步骤,兼顾本地私有化需求与云端7×24小时运行需求,文中所有代码命令均可直接复制执行,确保零基础用户也能快速搭建属于自己的本地智能体。
8282 67
|
4天前
|
人工智能 安全 API
CoPaw:3分钟部署你的 AI助理
源自阿里巴巴开源生态的个人 AI 助理——CoPaw。作为阿里倾力打造的开源力作,CoPaw 完美打通钉钉、飞书、Discord 等多平台对话通道,支持定时任务自动化。内置 PDF/Office 深度处理、新闻摘要等强大技能,更开放自定义扩展接口。坚持数据全程私有化部署,绝不上传云端,让每一位用户都能在大厂技术加持下,拥有安全、专属的智能助手。
|
5天前
|
人工智能 自然语言处理 机器人
保姆级教程:Mac本地搭建OpenClaw及阿里云上1分钟部署OpenClaw+飞书集成实战指南
OpenClaw(曾用名Clawdbot、Moltbot)作为2026年最热门的开源个人AI助手平台,以“自然语言驱动自动化”为核心,支持对接飞书、Telegram等主流通讯工具,可替代人工完成文件操作、日历管理、邮件处理等重复性工作。其模块化架构适配多系统环境,既可以在Mac上本地化部署打造私人助手,也能通过阿里云实现7×24小时稳定运行,完美兼顾隐私性与便捷性。
3800 7
|
4天前
|
人工智能 安全 JavaScript
阿里云上+本地部署OpenClaw(小龙虾)新手攻略:解锁10大必备Skills,零基础也能玩转AI助手
2026年,开源AI代理工具OpenClaw(昵称“小龙虾”)凭借“能实际做事”的核心优势,在GitHub斩获25万+星标,成为现象级AI工具。它最强大的魅力在于可扩展的Skills(技能包)系统——通过ClawHub插件市场的数百个技能,能让AI助手从简单聊天升级为处理办公、学习、日常事务的全能帮手。
3526 8
|
7天前
|
人工智能 JSON JavaScript
手把手教你用 OpenClaw + 飞书,打造专属 AI 机器人
手把手教你用 OpenClaw(v2026.2.22-2)+ 飞书,10分钟零代码搭建专属AI机器人!内置飞书插件,无需额外安装;支持Claude等主流模型,命令行一键配置。告别复杂开发,像聊同事一样自然对话。
4195 13
手把手教你用 OpenClaw + 飞书,打造专属 AI 机器人
|
6天前
|
人工智能 监控 机器人
2026年零门槛部署 OpenClaw(Clawdbot)接入A股数据,实现24小时股票分析保姆级教程
在AI赋能金融分析的浪潮中,OpenClaw(原Clawdbot/Moltbot)凭借开源灵活的架构,成为个人投资者打造专属智能分析助手的首选。通过接入A股实时数据,它能实现24小时市场监控、涨跌预警、潜力股推荐等核心功能,彻底解放人工盯盘的繁琐。而阿里云的稳定部署环境,更让这套系统实现全天候不间断运行,成为真正的“金融AI助手”。 本文基于OpenClaw v2026.1.25稳定版与QVeris免费A股数据接口,详细拆解阿里云OpenClaw部署步骤、A股数据接入流程、高级分析功能配置及多平台联动技巧,所有代码命令均可直接复制复用,即使无技术基础也能在1小时内完成从部署到实战的全流程。
3032 11
|
9天前
|
存储 人工智能 BI
2026年OpenClaw(Clawdbot)极简部署:接入小红书全自动运营,一个人=一支团队
2026年的小红书运营赛道,AI自动化工具已成为核心竞争力。OpenClaw(原Clawdbot)凭借“Skill插件化集成、全流程自动化、跨平台联动”的核心优势,彻底颠覆传统运营模式——从热点追踪、文案创作、封面设计到自动发布、账号互动,仅需一句自然语言指令,即可实现全链路闭环。而阿里云作为OpenClaw官方推荐的云端部署载体,2026年推出专属秒级部署方案,预装全套运行环境与小红书运营插件,让零基础用户也能10分钟完成部署,轻松拥有7×24小时在线的“专属运营团队”。
2771 11