前言
Apache RocketMQ 是历经双十一万亿级流量考验的分布式消息与流处理平台,现已成为 Apache 顶级项目,凭借低延迟、高吞吐、金融级可靠性和云原生架构,成为国内微服务、事件驱动架构中的主流消息中间件。本文将从底层核心原理到生产级集群部署,再到业务实战,用通俗语言讲透底层逻辑,兼顾深度与可读性,让读者既能夯实基础,又能直接解决生产实际问题。
一、RocketMQ 核心架构与角色职责
RocketMQ 采用经典的发布-订阅架构,核心由四大组件构成,各组件职责清晰、协同工作,整体架构如下:
1.1 NameServer:轻量级路由注册中心
NameServer 是 RocketMQ 的“路由中枢”,采用无状态设计,集群节点间互不通信,核心职责如下:
- 路由信息管理:维护 Broker 集群的元数据,包括 Broker 地址、主从角色、Topic 与队列的映射关系,支持 O(1) 时间复杂度的路由查询。
- Broker 注册与心跳检测:Broker 启动后会每隔 30 秒向所有 NameServer 节点上报自身状态;NameServer 若 120 秒未收到 Broker 心跳,会将该节点从路由表中剔除。
- 客户端路由服务:Producer 和 Consumer 启动时会从 NameServer 拉取路由信息,并定时更新,实现动态服务发现。
1.2 Broker:消息存储与转发核心
Broker 是 RocketMQ 的“心脏”,承担消息的存储、投递、查询和高可用控制,是整个系统性能和可靠性的核心载体,核心职责:
- 消息持久化存储:通过 CommitLog、ConsumeQueue、IndexFile 三大文件实现消息的高性能存储与快速检索。
- 消息投递:处理 Consumer 的拉取请求,将消息推送给消费者,维护消费进度。
- 高可用保障:支持主从架构,通过同步/异步复制实现数据多副本;基于 Dledger Raft 协议实现自动主从切换,保障服务持续可用。
- 功能调度:实现事务消息回查、延时消息、重试与死信队列等核心功能。
1.3 Producer:消息生产者
Producer 负责将业务消息发送到 Broker 集群,支持同步、异步、单向三种发送方式,内置负载均衡、失败重试、故障规避机制,保障消息发送的可靠性。
1.4 Consumer:消息消费者
Consumer 负责从 Broker 拉取消息并执行业务逻辑,支持集群消费和广播消费两种模式,内置负载均衡、重平衡、消费重试机制,保障消息消费的完整性。
二、RocketMQ 核心底层原理
2.1 消息存储核心原理
RocketMQ 的高性能和高可靠,核心源于其优秀的存储设计,采用“统一日志存储+逻辑索引分离”的架构,彻底解决了传统消息队列多 Topic 下的性能衰减问题。
2.1.1 三大核心存储文件
1. CommitLog:消息主体存储文件
所有 Topic 的所有消息都会顺序追加写入 CommitLog,彻底利用机械硬盘的顺序写入高性能特性,SSD 盘性能更优。
- 单个文件默认大小 1GB,文件名由 20 位数字组成,代表文件的起始物理偏移量,例如
00000000000000000000,下一个文件为00000000001073741824。 - 消息写入完全顺序,读取时通过偏移量随机访问,配合 PageCache 和零拷贝技术,实现超高的读写性能。
2. ConsumeQueue:消费逻辑索引
ConsumeQueue 是消息的逻辑索引,每个 Topic 的每个队列对应一个 ConsumeQueue 文件,解决了消费者全量扫描 CommitLog 的性能问题。
- 单个文件默认包含 30 万条条目,每个条目固定 20 字节,结构为:
8字节CommitLog物理偏移量 + 4字节消息大小 + 8字节Tag哈希值,单个文件大小约 5.72MB。 - Consumer 消费时,先从 ConsumeQueue 获取消息的物理偏移量,再根据偏移量从 CommitLog 读取完整消息,实现精准的消息检索。
3. IndexFile:消息索引文件
IndexFile 用于根据 Message Key 快速查询消息,采用哈希索引结构,实现毫秒级的消息查询。
- 单个 IndexFile 包含 40 字节文件头、500 万个哈希槽(每个4字节)、2000 万个索引条目(每个20字节),单个文件大小约 400MB。
- 当需要根据 Message Key 检索消息时,先计算 Key 的哈希值,找到对应的哈希槽,再通过索引条目获取消息的 CommitLog 偏移量,最终读取完整消息。
2.1.2 高性能存储优化机制
1. 零拷贝技术
RocketMQ 采用内存映射(MMAP)+ Sendfile 零拷贝技术,减少用户态与内核态之间的数据拷贝,大幅提升消息读写性能:
- MMAP:将 CommitLog 文件直接映射到用户态内存,操作内存等同于操作文件,避免了内核缓冲区到用户缓冲区的拷贝。
- Sendfile:消息网络传输时,直接从内核缓冲区发送到网卡,无需经过用户态,彻底减少数据拷贝次数。
2. PageCache 页缓存优化
消息写入时优先写入操作系统的 PageCache,而非直接刷盘,由操作系统异步将脏页刷写到磁盘;消息读取时,若 PageCache 中存在该数据,直接从内存返回,无需磁盘 IO。
- 顺序写入的特性让 PageCache 预读机制效果最大化,大幅提升读取性能。
- 冷热数据分离,热数据常驻 PageCache,实现内存级的读写性能。
2.1.3 刷盘机制:平衡性能与可靠性
RocketMQ 提供两种刷盘策略,用户可根据业务对可靠性和性能的需求灵活选择。
| 刷盘模式 | 核心逻辑 | 优点 | 缺点 | 适用场景 |
| 同步刷盘(SYNC_FLUSH) | 消息写入 PageCache 后,立即调用 fsync 强制刷写到磁盘,刷盘成功后才返回发送成功 | 数据可靠性极高,机器宕机不会丢失消息 | 写入性能有损耗,延迟增加 | 金融、支付等对数据零丢失要求极高的场景 |
| 异步刷盘(ASYNC_FLUSH) | 消息写入 PageCache 后立即返回发送成功,后台线程每隔固定时间(默认500ms)批量刷盘 | 写入性能极高,吞吐量和延迟表现优秀 | 机器宕机时会丢失 PageCache 中未刷盘的消息 | 大部分互联网业务,对性能要求高,允许极少量消息丢失的场景 |
2.1.4 主从复制与高可用
RocketMQ 提供两种主从数据复制模式,配合刷盘机制,实现不同等级的数据可靠性保障。
| 复制模式 | 核心逻辑 | 优点 | 缺点 |
| 同步复制 | Master 收到消息后,需等待 Slave 同步完成并返回 ACK,才会给 Producer 返回发送成功 | 主备数据完全一致,Master 宕机后数据零丢失,Slave 可提供读服务 | 消息发送延迟增加,吞吐量有一定损耗 |
| 异步复制 | Master 收到消息写入成功后,立即返回发送成功,后台异步将数据同步给 Slave | 发送延迟低,吞吐量高,性能损耗极小 | Master 宕机时,Slave 可能未同步完数据,会丢失少量消息 |
2.2 消息发送核心原理
2.2.1 三种消息发送方式
RocketMQ 提供三种消息发送方式,适配不同的业务场景:
- 同步发送:Producer 发送消息后,阻塞等待 Broker 的返回结果,直到发送成功或超时失败。可靠性最高,适用于订单、支付等核心消息场景。
- 异步发送:Producer 发送消息时注册回调函数,非阻塞等待 Broker 的返回结果,发送线程不会阻塞。适用于吞吐量要求高、需要处理发送结果的场景。
- 单向发送:Producer 发送消息后,不等待 Broker 的返回结果,也不注册回调,发送后立即返回。性能最高,可靠性最低,适用于日志收集等非核心消息场景。
2.2.2 消息发送全流程
消息发送的完整流程如下,内置负载均衡、故障重试机制,保障消息发送的可靠性:
2.2.3 事务消息原理
RocketMQ 原生支持事务消息,基于两阶段提交+事务回查机制,实现本地事务与消息发送的原子性,彻底解决分布式事务问题。
事务消息完整流程:
- 半消息发送:Producer 发送半消息(Half Message)到 Broker,Broker 将消息存储到系统内置的半消息主题中,此时消息对消费者不可见,返回发送成功给 Producer。
- 本地事务执行:Producer 收到半消息发送成功的响应后,执行本地事务逻辑。
- 二次确认提交:Producer 根据本地事务执行结果,向 Broker 发送二次确认指令:
- 事务执行成功:发送 Commit 指令,Broker 将半消息从半消息主题转移到真实业务主题,对消费者可见。
- 事务执行失败:发送 Rollback 指令,Broker 直接删除半消息,不会投递给消费者。
- 事务状态回查:若发生网络异常,Broker 未收到二次确认指令,Broker 会定时回查 Producer 的本地事务状态(默认间隔1分钟,最多回查15次),若超过回查次数仍无法确认,Broker 将自动 Rollback 消息。
2.3 消息消费核心原理
2.3.1 两种消费模式
RocketMQ 提供两种消费模式,适配不同的业务场景:
- 集群消费:同一个消费组(Consumer Group)的多个消费者,共同消费 Topic 的队列,一个队列只会被同一个消费组的一个消费者消费,实现负载均衡,消息只会被消费一次。这是生产环境最常用的消费模式。
- 广播消费:同一个消费组的所有消费者,都会消费 Topic 的所有队列,每条消息都会被消费组内的每一个消费者消费一次。适用于配置刷新、全局通知等场景。
2.3.2 推模式底层实现
RocketMQ 的推模式(Push Consumer)本质上是对拉模式的封装,底层采用长轮询机制实现:
- Consumer 主动向 Broker 发起拉取请求,若 Broker 有对应消息,立即返回消息给 Consumer。
- 若 Broker 没有对应消息,不会立即返回空结果,而是 hold 住该请求,直到有新消息到达或长轮询超时(默认15秒),再返回响应给 Consumer。
- 长轮询机制既保证了消息消费的实时性,又避免了 Consumer 频繁轮询带来的性能损耗。
2.3.3 消费重试与死信队列
RocketMQ 提供完善的消费重试与死信机制,保障消息不会因消费失败而丢失:
- 消费重试:集群消费模式下,消息消费失败后,Consumer 抛出异常会触发重试,消息会被发送到重试主题(
%RETRY%+消费组名),每个消费组有独立的重试队列。重试次数默认16次,每次重试的间隔时间递增,从1秒到2小时不等。 - 死信队列:当消息重试次数超过最大值(默认16次)后,消息会被投递到死信主题(
%DLQ%+消费组名),死信队列中的消息不会再被自动消费,需要人工干预处理,避免消息无限重试占用系统资源。
2.4 负载均衡与重平衡原理
2.4.1 Producer 端负载均衡
Producer 从 NameServer 获取 Topic 的路由信息后,默认采用轮询策略将消息发送到不同的队列,实现 Broker 集群的负载均衡。同时支持自定义队列选择策略,例如根据 Message Key 哈希取模,保证同一个 Key 的消息发送到同一个队列,实现严格的顺序消息。
2.4.2 Consumer 端负载均衡与重平衡
集群消费模式下,同一个消费组的消费者会对 Topic 的队列进行分配,核心规则是一个队列只能被同一个消费组的一个消费者消费,一个消费者可以消费多个队列。
RocketMQ 提供多种队列分配策略,默认采用平均分配策略,同时支持环形分配、一致性哈希、机房就近分配等策略。
当消费组内的消费者数量发生变化(上下线)、Topic 的队列数量发生变化时,会触发重平衡(Rebalance),重新分配队列,保证消费的负载均衡。重平衡完整流程如下:
三、RocketMQ 生产级集群部署实战
3.1 集群模式选型
生产环境需根据业务的可靠性、性能、可用性要求,选择合适的集群模式,各模式对比与适用场景如下:
| 集群模式 | 核心优势 | 核心劣势 | 适用场景 |
| 多Master多Slave(异步复制) | 性能极高,吞吐量优秀,无单点故障,Master宕机后可从Slave继续消费 | Master宕机时会丢失少量未同步的消息 | 绝大多数互联网核心业务,对性能要求高,允许极少量消息丢失 |
| 多Master多Slave(同步复制) | 数据零丢失,金融级可靠性,无单点故障,Master宕机后数据完整 | 发送延迟更高,吞吐量有一定损耗 | 金融、支付、交易等对数据可靠性要求极高的场景 |
| Dledger Raft模式 | 自动主从切换,真正的高可用,数据强一致,无需人工干预故障转移 | 部署复杂度略高,性能比异步复制略有损耗 | 生产核心业务,要求故障自动转移、服务零中断的场景 |
本文以3节点Dledger Raft集群为例,讲解生产级集群的完整部署流程,基于Apache RocketMQ 5.4.0,适配 JDK 17,实现自动主从切换、数据强一致、高可用。
3.2 环境准备
3.2.1 机器规划
| 机器IP | 部署角色 | 核心端口 |
| 192.168.1.101 | NameServer、Broker | 9876、10911、10909、40911 |
| 192.168.1.102 | NameServer、Broker | 9876、10911、10909、40911 |
| 192.168.1.103 | NameServer、Broker | 9876、10911、10909、40911 |
3.2.2 软件与硬件要求
- 操作系统:CentOS 7.9+/Ubuntu 22.04+,生产环境优先选用稳定版 Linux 系统。
- JDK:JDK 17 ,RocketMQ 5.4.0 完全适配 JDK 17。
- RocketMQ:Apache RocketMQ 5.4.0 。
- 硬件要求:CPU 8核16线程以上,内存32G以上,SSD固态硬盘1TB以上,万兆网卡。
3.2.3 操作系统优化
# 1. 关闭防火墙和SELinux
systemctl stop firewalld && systemctl disable firewalld
setenforce 0 && sed -i 's/^SELINUX=.*/SELINUX=disabled/' /etc/selinux/config
# 2. 优化文件句柄数
echo "* soft nofile 655350" >> /etc/security/limits.conf
echo "* hard nofile 655350" >> /etc/security/limits.conf
echo "* soft nproc 655350" >> /etc/security/limits.conf
echo "* hard nproc 655350" >> /etc/security/limits.conf
# 3. 虚拟内存优化,关闭swap分区
echo "vm.swappiness=0" >> /etc/sysctl.conf
sysctl -p
# 4. 关闭透明大页,避免内存碎片化
echo never > /sys/kernel/mm/transparent_hugepage/enabled
echo never > /sys/kernel/mm/transparent_hugepage/defrag
3.3 集群部署步骤
步骤1:JDK 17 安装与配置(所有机器执行)
# 下载JDK 17最新稳定版
wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
# 解压到指定目录
tar -zxvf jdk-17_linux-x64_bin.tar.gz -C /usr/local/
# 配置环境变量
cat >> /etc/profile << EOF
export JAVA_HOME=/usr/local/jdk-17.0.12
export PATH=\$JAVA_HOME/bin:\$PATH
export CLASSPATH=.:\$JAVA_HOME/lib/dt.jar:\$JAVA_HOME/lib/tools.jar
EOF
# 生效环境变量
source /etc/profile
# 验证安装
java -version
步骤2:RocketMQ 安装与环境配置(所有机器执行)
# 下载Apache RocketMQ 5.4.0最新稳定版
wget https://archive.apache.org/dist/rocketmq/5.4.0/rocketmq-all-5.4.0-bin-release.zip
# 解压
unzip rocketmq-all-5.4.0-bin-release.zip -d /usr/local/
# 重命名
mv /usr/local/rocketmq-all-5.4.0-bin-release /usr/local/rocketmq
# 配置环境变量
cat >> /etc/profile << EOF
export ROCKETMQ_HOME=/usr/local/rocketmq
export PATH=\$ROCKETMQ_HOME/bin:\$PATH
EOF
# 生效环境变量
source /etc/profile
# 创建消息存储目录
mkdir -p /data/rocketmq/store
chmod -R 777 /data/rocketmq/store
步骤3:JVM 参数优化(适配JDK 17,所有机器执行)
修改 NameServer JVM 配置文件 bin/runserver.sh:
vi /usr/local/rocketmq/bin/runserver.sh
# 修改核心JVM参数
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
# JDK 17 反射权限适配参数
JAVA_OPT="${JAVA_OPT} --add-opens=java.base/java.lang=ALL-UNNAMED"
JAVA_OPT="${JAVA_OPT} --add-opens=java.base/java.math=ALL-UNNAMED"
JAVA_OPT="${JAVA_OPT} --add-opens=java.base/java.io=ALL-UNNAMED"
JAVA_OPT="${JAVA_OPT} --add-opens=java.base/java.nio=ALL-UNNAMED"
JAVA_OPT="${JAVA_OPT} --add-opens=java.base/java.net=ALL-UNNAMED"
JAVA_OPT="${JAVA_OPT} --add-opens=java.base/java.util=ALL-UNNAMED"
JAVA_OPT="${JAVA_OPT} --add-opens=java.base/java.util.concurrent=ALL-UNNAMED"
JAVA_OPT="${JAVA_OPT} --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED"
JAVA_OPT="${JAVA_OPT} --add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
JAVA_OPT="${JAVA_OPT} --add-opens=java.base/sun.nio.cs=ALL-UNNAMED"
JAVA_OPT="${JAVA_OPT} --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED"
修改 Broker JVM 配置文件 bin/runbroker.sh:
vi /usr/local/rocketmq/bin/runbroker.sh
# 修改核心JVM参数
JAVA_OPT="${JAVA_OPT} -server -Xms16g -Xmx16g -Xmn8g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=16g"
# JDK 17 反射权限适配参数(同runserver.sh,添加相同的--add-opens参数)
步骤4:Broker 配置文件编写(Dledger模式)
192.168.1.101 机器配置文件 conf/dledger/broker-n0.conf:
brokerClusterName=RocketMQ-Dledger-Cluster
brokerName=RaftNode00
brokerId=-1
namesrvAddr=192.168.1.101:9876;192.168.1.102:9876;192.168.1.103:9876
listenPort=10911
storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
storePathConsumeQueue=/data/rocketmq/store/consumequeue
storePathIndex=/data/rocketmq/store/index
flushDiskType=ASYNC_FLUSH
sendMessageThreadPoolNums=32
pullMessageThreadPoolNums=32
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-192.168.1.101:40911;n1-192.168.1.102:40911;n2-192.168.1.103:40911
dLegerSelfId=n0
enableTrace=true
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=false
192.168.1.102 机器配置文件 conf/dledger/broker-n1.conf,仅修改 dLegerSelfId=n1,其余配置完全一致。 192.168.1.103 机器配置文件 conf/dledger/broker-n2.conf,仅修改 dLegerSelfId=n2,其余配置完全一致。
步骤5:启动 NameServer 集群(所有机器执行)
# 后台启动NameServer
nohup sh /usr/local/rocketmq/bin/mqnamesrv > /usr/local/rocketmq/logs/namesrv.log 2>&1 &
# 查看启动日志,出现"The Name Server boot success"表示启动成功
tail -f /usr/local/rocketmq/logs/namesrv.log
步骤6:启动 Broker 集群(对应机器执行)
192.168.1.101 机器:
nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/dledger/broker-n0.conf > /usr/local/rocketmq/logs/broker.log 2>&1 &
192.168.1.102 机器:
nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/dledger/broker-n1.conf > /usr/local/rocketmq/logs/broker.log 2>&1 &
192.168.1.103 机器:
nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/dledger/broker-n2.conf > /usr/local/rocketmq/logs/broker.log 2>&1 &
查看启动日志,出现The broker boot success表示启动成功。
步骤7:集群状态验证
# 查看集群节点状态
sh /usr/local/rocketmq/bin/mqadmin clusterList -n 192.168.1.101:9876
执行后可看到3个Broker节点,其中1个为Master,另外2个为Slave,集群部署成功。
步骤8:RocketMQ Dashboard 可视化控制台部署
# 下载RocketMQ Dashboard 2.0.0最新稳定版
wget https://archive.apache.org/dist/rocketmq/rocketmq-dashboard/2.0.0/rocketmq-dashboard-2.0.0.jar
# 后台启动Dashboard
nohup java -jar rocketmq-dashboard-2.0.0.jar --rocketmq.config.namesrvAddr=192.168.1.101:9876;192.168.1.102:9876;192.168.1.103:9876 --server.port=8080 > dashboard.log 2>&1 &
启动后访问 http://机器IP:8080 即可进入可视化控制台,管理集群、Topic、消息、消费者等资源。
四、Spring Boot 3.x 整合 RocketMQ 实战
本实战基于 JDK 17、Spring Boot 3.3.5、RocketMQ 5.4.0,包含完整的消息发送、消费、幂等处理、业务落地全流程。
4.1 项目依赖配置 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.5</version>
<relativePath/>
</parent>
<groupId>com.jam.demo</groupId>
<artifactId>rocketmq-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rocketmq-demo</name>
<description>RocketMQ Spring Boot Demo</description>
<properties>
<java.version>17</java.version>
<rocketmq.version>2.5.0</rocketmq.version>
<mybatis-plus.version>3.5.7</mybatis-plus.version>
<fastjson2.version>2.0.53</fastjson2.version>
<guava.version>33.2.0-jre</guava.version>
<lombok.version>1.18.34</lombok.version>
<springdoc.version>2.6.0</springdoc.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>${springdoc.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
4.2 应用配置文件 application.yml
server:
port: 8081
spring:
application:
name: rocketmq-demo
datasource:
url: jdbc:mysql://127.0.0.1:3306/rocketmq_demo?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=true
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
rocketmq:
name-server: 192.168.1.101:9876;192.168.1.102:9876;192.168.1.103:9876
producer:
group: demo-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 2
retry-times-when-send-async-failed: 2
mybatis-plus:
mapper-locations: classpath*:/mapper/**/*.xml
configuration:
map-underscore-to-camel-case: true
global-config:
db-config:
id-type: auto
springdoc:
api-docs:
enabled: true
path: /v3/api-docs
swagger-ui:
enabled: true
path: /swagger-ui.html
4.3 MySQL 数据库表结构(MySQL 8.0 可直接执行)
CREATE DATABASE IF NOT EXISTS rocketmq_demo DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE rocketmq_demo;
-- 消息消费幂等表
DROP TABLE IF EXISTS t_message_idempotent;
CREATE TABLE t_message_idempotent (
id BIGINT NOT NULL AUTO_INCREMENT COMMENT '主键ID',
message_id VARCHAR(64) NOT NULL COMMENT '消息唯一ID',
topic VARCHAR(128) NOT NULL COMMENT '消息主题',
consumer_group VARCHAR(128) NOT NULL COMMENT '消费组',
consume_status TINYINT NOT NULL DEFAULT 0 COMMENT '消费状态 0-未消费 1-已消费',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (id),
UNIQUE KEY uk_msg_id (message_id, topic, consumer_group) COMMENT '消息唯一索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='消息消费幂等表';
-- 订单业务表
DROP TABLE IF EXISTS t_order;
CREATE TABLE t_order (
id BIGINT NOT NULL AUTO_INCREMENT COMMENT '订单ID',
order_no VARCHAR(32) NOT NULL COMMENT '订单编号',
user_id BIGINT NOT NULL COMMENT '用户ID',
amount DECIMAL(10,2) NOT NULL COMMENT '订单金额',
order_status TINYINT NOT NULL DEFAULT 0 COMMENT '订单状态 0-待支付 1-已支付 2-已取消',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (id),
UNIQUE KEY uk_order_no (order_no) COMMENT '订单编号唯一索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='订单表';
4.4 核心实体类
4.4.1 消息幂等实体类
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 消息消费幂等实体类
* @author ken
* @date 2026-03-03
*/
@Data
@TableName("t_message_idempotent")
@Schema(description = "消息消费幂等实体")
public class MessageIdempotent {
@TableId(type = IdType.AUTO)
@Schema(description = "主键ID")
private Long id;
@Schema(description = "消息唯一ID")
private String messageId;
@Schema(description = "消息主题")
private String topic;
@Schema(description = "消费组")
private String consumerGroup;
@Schema(description = "消费状态 0-未消费 1-已消费")
private Integer consumeStatus;
@Schema(description = "创建时间")
private LocalDateTime createTime;
@Schema(description = "更新时间")
private LocalDateTime updateTime;
}
4.4.2 订单实体类
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;
/**
* 订单实体类
* @author ken
* @date 2026-03-03
*/
@Data
@TableName("t_order")
@Schema(description = "订单实体")
public class Order {
@TableId(type = IdType.AUTO)
@Schema(description = "订单ID")
private Long id;
@Schema(description = "订单编号")
private String orderNo;
@Schema(description = "用户ID")
private Long userId;
@Schema(description = "订单金额")
private BigDecimal amount;
@Schema(description = "订单状态 0-待支付 1-已支付 2-已取消")
private Integer orderStatus;
@Schema(description = "创建时间")
private LocalDateTime createTime;
@Schema(description = "更新时间")
private LocalDateTime updateTime;
}
4.4.3 订单消息DTO
package com.jam.demo.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.io.Serializable;
import java.math.BigDecimal;
/**
* 订单消息DTO
* @author ken
* @date 2026-03-03
*/
@Data
@Schema(description = "订单消息DTO")
public class OrderMessageDTO implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "订单编号")
private String orderNo;
@Schema(description = "用户ID")
private Long userId;
@Schema(description = "订单金额")
private BigDecimal amount;
@Schema(description = "订单状态")
private Integer orderStatus;
}
4.5 Mapper 数据访问层
4.5.1 消息幂等Mapper
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.MessageIdempotent;
import org.apache.ibatis.annotations.Mapper;
/**
* 消息幂等Mapper
* @author ken
* @date 2026-03-03
*/
@Mapper
public interface MessageIdempotentMapper extends BaseMapper<MessageIdempotent> {
}
4.5.2 订单Mapper
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.Order;
import org.apache.ibatis.annotations.Mapper;
/**
* 订单Mapper
* @author ken
* @date 2026-03-03
*/
@Mapper
public interface OrderMapper extends BaseMapper<Order> {
}
4.6 Service 业务逻辑层
4.6.1 消息幂等服务接口
package com.jam.demo.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.jam.demo.entity.MessageIdempotent;
/**
* 消息幂等服务接口
* @author ken
* @date 2026-03-03
*/
public interface MessageIdempotentService extends IService<MessageIdempotent> {
/**
* 检查消息是否已经消费
* @param messageId 消息唯一ID
* @param topic 消息主题
* @param consumerGroup 消费组
* @return true-已消费 false-未消费
*/
boolean checkMessageConsumed(String messageId, String topic, String consumerGroup);
/**
* 标记消息为已消费
* @param messageId 消息唯一ID
* @param topic 消息主题
* @param consumerGroup 消费组
* @return 操作结果
*/
boolean markMessageConsumed(String messageId, String topic, String consumerGroup);
}
4.6.2 消息幂等服务实现类
package com.jam.demo.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.jam.demo.entity.MessageIdempotent;
import com.jam.demo.mapper.MessageIdempotentMapper;
import com.jam.demo.service.MessageIdempotentService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.ObjectUtils;
/**
* 消息幂等服务实现类
* @author ken
* @date 2026-03-03
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageIdempotentServiceImpl extends ServiceImpl<MessageIdempotentMapper, MessageIdempotent> implements MessageIdempotentService {
private final PlatformTransactionManager transactionManager;
@Override
public boolean checkMessageConsumed(String messageId, String topic, String consumerGroup) {
if (!org.springframework.util.StringUtils.hasText(messageId) || !org.springframework.util.StringUtils.hasText(topic) || !org.springframework.util.StringUtils.hasText(consumerGroup)) {
log.error("消息幂等校验参数异常,messageId:{}, topic:{}, consumerGroup:{}", messageId, topic, consumerGroup);
return false;
}
LambdaQueryWrapper<MessageIdempotent> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(MessageIdempotent::getMessageId, messageId)
.eq(MessageIdempotent::getTopic, topic)
.eq(MessageIdempotent::getConsumerGroup, consumerGroup)
.eq(MessageIdempotent::getConsumeStatus, 1);
MessageIdempotent messageIdempotent = this.getOne(queryWrapper);
return !ObjectUtils.isEmpty(messageIdempotent);
}
@Override
public boolean markMessageConsumed(String messageId, String topic, String consumerGroup) {
if (!org.springframework.util.StringUtils.hasText(messageId) || !org.springframework.util.StringUtils.hasText(topic) || !org.springframework.util.StringUtils.hasText(consumerGroup)) {
log.error("标记消息消费状态参数异常,messageId:{}, topic:{}, consumerGroup:{}", messageId, topic, consumerGroup);
return false;
}
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(DefaultTransactionDefinition.PROPAGATION_REQUIRED);
TransactionStatus status = transactionManager.getTransaction(def);
try {
MessageIdempotent messageIdempotent = new MessageIdempotent();
messageIdempotent.setMessageId(messageId);
messageIdempotent.setTopic(topic);
messageIdempotent.setConsumerGroup(consumerGroup);
messageIdempotent.setConsumeStatus(1);
boolean saveResult = this.save(messageIdempotent);
transactionManager.commit(status);
return saveResult;
} catch (Exception e) {
transactionManager.rollback(status);
log.error("标记消息消费状态异常,messageId:{}, topic:{}, consumerGroup:{}", messageId, topic, consumerGroup, e);
return false;
}
}
}
4.6.3 订单服务接口
package com.jam.demo.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.jam.demo.dto.OrderMessageDTO;
import com.jam.demo.entity.Order;
/**
* 订单服务接口
* @author ken
* @date 2026-03-03
*/
public interface OrderService extends IService<Order> {
/**
* 创建订单
* @param orderMessageDTO 订单消息DTO
* @return 操作结果
*/
boolean createOrder(OrderMessageDTO orderMessageDTO);
}
4.6.4 订单服务实现类
package com.jam.demo.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.jam.demo.dto.OrderMessageDTO;
import com.jam.demo.entity.Order;
import com.jam.demo.mapper.OrderMapper;
import com.jam.demo.service.OrderService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.StringUtils;
/**
* 订单服务实现类
* @author ken
* @date 2026-03-03
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {
private final PlatformTransactionManager transactionManager;
@Override
public boolean createOrder(OrderMessageDTO orderMessageDTO) {
if (!StringUtils.hasText(orderMessageDTO.getOrderNo()) || orderMessageDTO.getUserId() == null || orderMessageDTO.getAmount() == null) {
log.error("创建订单参数异常,orderMessageDTO:{}", orderMessageDTO);
return false;
}
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(DefaultTransactionDefinition.PROPAGATION_REQUIRED);
TransactionStatus status = transactionManager.getTransaction(def);
try {
Order order = new Order();
order.setOrderNo(orderMessageDTO.getOrderNo());
order.setUserId(orderMessageDTO.getUserId());
order.setAmount(orderMessageDTO.getAmount());
order.setOrderStatus(orderMessageDTO.getOrderStatus());
boolean saveResult = this.save(order);
transactionManager.commit(status);
log.info("创建订单成功,orderNo:{}", orderMessageDTO.getOrderNo());
return saveResult;
} catch (Exception e) {
transactionManager.rollback(status);
log.error("创建订单异常,orderNo:{}", orderMessageDTO.getOrderNo(), e);
return false;
}
}
}
4.7 消息生产者服务
package com.jam.demo.producer;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.dto.OrderMessageDTO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
/**
* RocketMQ消息生产者服务
* @author ken
* @date 2026-03-03
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class RocketMQProducerService {
private final RocketMQTemplate rocketMQTemplate;
/**
* 同步发送订单消息
* @param topic 消息主题
* @param orderMessageDTO 订单消息DTO
* @return 发送结果
*/
public boolean syncSendOrderMessage(String topic, OrderMessageDTO orderMessageDTO) {
if (!StringUtils.hasText(topic) || orderMessageDTO == null) {
log.error("同步发送消息参数异常,topic:{}, orderMessageDTO:{}", topic, orderMessageDTO);
return false;
}
try {
rocketMQTemplate.syncSend(topic, JSON.toJSONString(orderMessageDTO));
log.info("同步发送订单消息成功,topic:{}, orderNo:{}", topic, orderMessageDTO.getOrderNo());
return true;
} catch (Exception e) {
log.error("同步发送订单消息异常,topic:{}, orderNo:{}", topic, orderMessageDTO.getOrderNo(), e);
return false;
}
}
/**
* 异步发送订单消息
* @param topic 消息主题
* @param orderMessageDTO 订单消息DTO
*/
public void asyncSendOrderMessage(String topic, OrderMessageDTO orderMessageDTO) {
if (!StringUtils.hasText(topic) || orderMessageDTO == null) {
log.error("异步发送消息参数异常,topic:{}, orderMessageDTO:{}", topic, orderMessageDTO);
return;
}
try {
rocketMQTemplate.asyncSend(topic, JSON.toJSONString(orderMessageDTO), new org.apache.rocketmq.spring.core.RocketMQLocalRequestCallback() {
@Override
public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
log.info("异步发送订单消息成功,topic:{}, orderNo:{}, msgId:{}", topic, orderMessageDTO.getOrderNo(), sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
log.error("异步发送订单消息异常,topic:{}, orderNo:{}", topic, orderMessageDTO.getOrderNo(), e);
}
});
} catch (Exception e) {
log.error("异步发送订单消息异常,topic:{}, orderNo:{}", topic, orderMessageDTO.getOrderNo(), e);
}
}
/**
* 单向发送订单消息
* @param topic 消息主题
* @param orderMessageDTO 订单消息DTO
*/
public void oneWaySendOrderMessage(String topic, OrderMessageDTO orderMessageDTO) {
if (!StringUtils.hasText(topic) || orderMessageDTO == null) {
log.error("单向发送消息参数异常,topic:{}, orderMessageDTO:{}", topic, orderMessageDTO);
return;
}
try {
rocketMQTemplate.sendOneWay(topic, JSON.toJSONString(orderMessageDTO));
log.info("单向发送订单消息完成,topic:{}, orderNo:{}", topic, orderMessageDTO.getOrderNo());
} catch (Exception e) {
log.error("单向发送订单消息异常,topic:{}, orderNo:{}", topic, orderMessageDTO.getOrderNo(), e);
}
}
}
4.8 消息消费者服务(含幂等处理)
package com.jam.demo.consumer;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.dto.OrderMessageDTO;
import com.jam.demo.service.MessageIdempotentService;
import com.jam.demo.service.OrderService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
/**
* 订单消息消费者
* @author ken
* @date 2026-03-03
*/
@Slf4j
@Service
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = "order_topic",
consumerGroup = "order_consumer_group",
selectorExpression = "*"
)
public class OrderMessageConsumer implements RocketMQListener<String> {
private final MessageIdempotentService messageIdempotentService;
private final OrderService orderService;
private static final String TOPIC = "order_topic";
private static final String CONSUMER_GROUP = "order_consumer_group";
@Override
public void onMessage(String message) {
if (!StringUtils.hasText(message)) {
log.warn("收到空消息,直接跳过");
return;
}
// 获取消息上下文与messageId
org.apache.rocketmq.common.message.MessageExt messageExt = org.apache.rocketmq.spring.support.RocketMQUtil.getMessageExt();
String messageId = messageExt.getMsgId();
log.info("收到订单消息,messageId:{}, message:{}", messageId, message);
// 幂等校验
boolean isConsumed = messageIdempotentService.checkMessageConsumed(messageId, TOPIC, CONSUMER_GROUP);
if (isConsumed) {
log.warn("消息已消费,直接跳过,messageId:{}", messageId);
return;
}
// 解析消息
OrderMessageDTO orderMessageDTO;
try {
orderMessageDTO = JSON.parseObject(message, OrderMessageDTO.class);
} catch (Exception e) {
log.error("消息解析异常,messageId:{}, message:{}", messageId, message, e);
throw new RuntimeException("消息解析异常", e);
}
// 业务处理
boolean createResult = orderService.createOrder(orderMessageDTO);
if (!createResult) {
log.error("订单创建失败,消息将重试,messageId:{}, orderNo:{}", messageId, orderMessageDTO.getOrderNo());
throw new RuntimeException("订单创建失败,触发重试");
}
// 标记消息为已消费
boolean markResult = messageIdempotentService.markMessageConsumed(messageId, TOPIC, CONSUMER_GROUP);
if (!markResult) {
log.error("标记消息消费状态失败,messageId:{}", messageId);
throw new RuntimeException("标记消息消费状态失败,触发重试");
}
log.info("订单消息消费完成,messageId:{}, orderNo:{}", messageId, orderMessageDTO.getOrderNo());
}
}
4.9 接口控制器
package com.jam.demo.controller;
import com.jam.demo.dto.OrderMessageDTO;
import com.jam.demo.producer.RocketMQProducerService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
/**
* 订单消息控制器
* @author ken
* @date 2026-03-03
*/
@Slf4j
@RestController
@RequestMapping("/api/order")
@RequiredArgsConstructor
@Tag(name = "订单消息接口", description = "RocketMQ订单消息发送接口")
public class OrderMessageController {
private final RocketMQProducerService rocketMQProducerService;
private static final String ORDER_TOPIC = "order_topic";
@PostMapping("/sync/send")
@Operation(summary = "同步发送订单消息", description = "同步发送订单消息到RocketMQ,等待发送结果")
public ResponseEntity<Boolean> syncSendOrderMessage(@RequestBody OrderMessageDTO orderMessageDTO) {
boolean result = rocketMQProducerService.syncSendOrderMessage(ORDER_TOPIC, orderMessageDTO);
return ResponseEntity.ok(result);
}
@PostMapping("/async/send")
@Operation(summary = "异步发送订单消息", description = "异步发送订单消息到RocketMQ,不阻塞主线程")
public ResponseEntity<String> asyncSendOrderMessage(@RequestBody OrderMessageDTO orderMessageDTO) {
rocketMQProducerService.asyncSendOrderMessage(ORDER_TOPIC, orderMessageDTO);
return ResponseEntity.ok("异步消息发送请求已提交");
}
@PostMapping("/oneway/send")
@Operation(summary = "单向发送订单消息", description = "单向发送订单消息到RocketMQ,不等待返回结果")
public ResponseEntity<String> oneWaySendOrderMessage(@RequestBody OrderMessageDTO orderMessageDTO) {
rocketMQProducerService.oneWaySendOrderMessage(ORDER_TOPIC, orderMessageDTO);
return ResponseEntity.ok("单向消息发送完成");
}
}
4.10 项目启动类
package com.jam.demo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;
/**
* RocketMQ Demo启动类
* @author ken
* @date 2026-03-03
*/
@SpringBootApplication
@MapperScan("com.jam.demo.mapper")
@OpenAPIDefinition(
info = @Info(
title = "RocketMQ Demo API",
version = "1.0",
description = "RocketMQ Spring Boot 整合示例API文档"
)
)
public class RocketmqDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RocketmqDemoApplication.class, args);
}
}
五、生产环境最佳实践与常见问题排查
5.1 消息可靠性保障最佳实践
生产端
- 核心消息优先使用同步发送,避免使用单向发送,确保消息发送结果可感知。
- 开启合理的发送重试,设置重试次数为2-3次,配合故障规避机制,自动切换到健康的Broker节点。
- 发送失败的消息需做好兜底处理,存入本地数据库,定时任务重试,避免消息丢失。
- 分布式事务场景必须使用RocketMQ原生事务消息机制,保证本地事务与消息发送的原子性。
Broker端
- 金融级场景使用同步复制+同步刷盘,普通核心场景使用Dledger模式+异步刷盘,平衡可靠性与性能。
- 生产环境关闭自动创建Topic和订阅组,提前规划并创建Topic,避免权限失控。
- 合理设置消息保留时间,默认3天,根据业务需求调整,避免磁盘空间耗尽。
- 开启消息轨迹,方便消息全链路追踪与问题排查。
消费端
- 消费失败必须抛出异常,触发RocketMQ的重试机制,禁止捕获异常后不处理导致消息丢失。
- 必须实现消费幂等,RocketMQ保证消息至少投递一次,重复投递是常态,幂等是消费端的必备能力。
- 死信队列必须配置监控告警,及时处理消费失败的死信消息,避免业务异常。
- 消费逻辑避免耗时操作,优化数据库、远程调用等逻辑,提升消费速度,避免消息堆积。
5.2 性能优化最佳实践
- 批量发送与消费:非实时性消息使用批量发送,减少网络IO次数,提升吞吐量;消费端开启批量拉取,减少RPC调用次数。
- 合理设置队列数量:单个Topic的队列数量建议与Broker的CPU核心数相当,过多的队列会导致ConsumeQueue文件过多,影响IO性能。
- JVM优化:合理设置堆内存与堆外内存,避免频繁GC;新生代大小设置为堆内存的50%,提升短生命周期对象的回收效率。
- 磁盘优化:使用SSD固态硬盘,CommitLog、ConsumeQueue、IndexFile分别挂载到不同的磁盘,避免IO竞争。
- 消息压缩:大于4KB的消息开启压缩,减少网络传输量,提升发送与消费性能。
5.3 常见问题排查
1. 消息堆积
- 排查思路:先确认消费端是否正常运行,是否有报错;再查看消费逻辑的耗时,是否消费速度跟不上生产速度;最后查看Topic的队列数量与消费者数量是否匹配。
- 解决方案:优化消费逻辑,减少耗时操作;增加消费者数量,提升消费能力;临时增加Topic的队列数量,提升并行消费能力;对于历史堆积消息,可通过重置消费位点跳过,单独处理。
2. 消息丢失
- 排查思路:从生产端、Broker端、消费端全链路排查,生产端是否发送成功,Broker端是否配置了异步刷盘+异步复制,消费端是否捕获了异常未抛出导致消息被标记为已消费。
- 解决方案:生产端使用同步发送,确认发送结果;Broker端根据业务需求配置合理的刷盘与复制策略;消费端消费失败必须抛出异常,禁止静默处理。
3. 消息重复消费
- 排查思路:RocketMQ的重试机制、重平衡、消费者重启都会导致消息重复投递,核心原因是消费端未实现幂等处理。
- 解决方案:基于消息唯一ID实现数据库唯一索引幂等,或者基于业务唯一键实现幂等,确保同一条消息多次消费结果一致。
六、总结
RocketMQ 作为国内最主流的分布式消息中间件,其优秀的架构设计、金融级的可靠性、超高的性能,完全能够满足绝大多数企业级业务场景的需求。本文从核心底层原理,到生产级集群部署,再到业务实战,全面讲解了RocketMQ的核心知识,所有内容均基于官方权威文档与生产实践,确保准确无误。
掌握RocketMQ的核心,本质上是理解其“高性能存储设计”、“可靠性保障机制”、“分布式高可用架构”三大核心,只有吃透底层逻辑,才能在生产环境中用好RocketMQ,避免踩坑,充分发挥其性能与可靠性优势。