RocketMQ作为阿里开源的分布式消息中间件,凭借高吞吐、低延迟、高可用的特性,已成为金融、电商、互联网等领域核心系统的标配。但在生产环境中,“能跑起来”只是第一步,稳定运行依赖完善的监控体系和标准化的运维操作——一旦出现消息堆积、Broker宕机、消费失败等问题,可能直接引发业务雪崩。本文将从底层原理出发,拆解RocketMQ的监控体系,结合实战案例讲解运维核心操作,让你既能吃透底层逻辑,又能解决生产中的实际问题。
一、RocketMQ监控体系的底层逻辑
要做好监控运维,首先得理解RocketMQ的核心架构,明确各组件的职责和数据流转路径:
- NameServer:注册中心,维护Broker的路由信息,无状态设计保证高可用;
- Broker:核心存储与转发组件,分Master/Slave架构,Master负责读写,Slave同步数据做容灾;
- Producer/Consumer:消息生产/消费端,通过NameServer获取Broker路由后交互;
- 存储层:CommitLog存储原始消息(物理日志),ConsumeQueue存储消息索引(逻辑队列),IndexFile提供消息索引查询。
监控的本质就是跟踪“数据流转(生产→存储→消费)”和“组件健康(硬件+软件状态)”,及时发现异常节点或瓶颈。
二、核心监控指标详解(底层含义+实战获取)
2.1 Broker核心指标
Broker是监控的重中之重,所有消息的存储、转发都依赖它,重点关注以下维度:
2.1.1 消息收发指标
- 生产TPS(msgPutTotalTps):Broker每秒接收的消息总数,反映Producer的生产压力,底层由
SendMessageProcessor处理请求的QPS累加计算; - 消费TPS(msgGetTotalTps):Broker每秒投递的消息总数,反映Consumer的消费能力;
- 消息累计量(msgPutTotalCount/msgGetTotalCount):Broker启动以来生产/消费的总消息数,用于评估业务规模。
实战获取代码:
package com.jam.demo.rocketmq.admin;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.broker.BrokerStatsSubCommand;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
/**
* Broker监控指标获取工具类
* @author ken
*/
@RestController
@RequestMapping("/rocketmq/broker")
@Tag(name = "Broker监控接口", description = "获取Broker的核心监控指标")
@Slf4j
public class BrokerMonitorController {
private static final DefaultMQAdminExt admin = new DefaultMQAdminExt();
static {
admin.setNamesrvAddr("127.0.0.1:9876");
try {
admin.start();
} catch (MQClientException e) {
log.error("DefaultMQAdminExt启动失败", e);
throw new RuntimeException(e);
}
}
/**
* 获取Broker的消息收发TPS
* @param brokerName Broker名称
* @return 消息收发TPS信息
* @throws MQClientException MQ客户端异常
*/
@GetMapping("/stats/{brokerName}")
@Operation(summary = "获取Broker消息收发TPS", description = "根据Broker名称查询生产/消费TPS")
public String getBrokerStats(
@Parameter(description = "Broker名称", required = true) @PathVariable String brokerName
) throws MQClientException {
if (!StringUtils.hasText(brokerName)) {
throw new IllegalArgumentException("Broker名称不能为空");
}
BrokerStatsSubCommand statsCommand = new BrokerStatsSubCommand();
String result = statsCommand.execute(admin, new String[]{"-b", brokerName});
if (ObjectUtils.isEmpty(result)) {
log.warn("Broker[{}]的监控指标为空", brokerName);
return "Broker[" + brokerName + "]无监控数据";
}
return result;
}
}
2.1.2 存储指标
Broker的存储层直接影响消息可靠性和读写性能,重点关注:
- CommitLog磁盘使用率(commitLogDiskUsedRatio):CommitLog所在磁盘的使用率,阈值建议≤80%,超过后Broker会拒绝接收新消息(默认配置);
- 刷盘延迟(flushCommitLogTimediff):消息从内存刷到磁盘的延迟,同步刷盘应接近0,异步刷盘建议≤10ms,否则宕机可能丢失数据;
- ConsumeQueue磁盘使用率(consumeQueueDiskUsedRatio):逻辑队列的存储压力,阈值同CommitLog。
获取方式:通过Prometheus Exporter暴露指标,Prometheus配置示例:
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'rocketmq-exporter'
static_configs:
- targets: ['127.0.0.1:5557'] # RocketMQ Exporter端口
2.1.3 线程与连接指标
- 线程池队列长度(sendThreadPoolQueueSize):发送线程池的等待队列长度,超过500表示线程池压力过大,会导致请求延迟;
- 客户端连接数(clientConnectionCount):Broker的客户端连接总数,过多会占用文件描述符,建议≤10000(需调整系统
ulimit -n)。
2.2 Topic指标
Topic是消息的逻辑分类,核心监控“生产-消费”的平衡关系:
- 生产/消费速度(topicPutTps/topicGetTps):消费速度需≥生产速度,否则会堆积;
- 消息堆积量(msgAccumulation):消费进度落后生产进度的消息数,计算公式:生产累计数 - 消费累计数。
易混淆点:
- 消息堆积:消费进度落后生产进度,是“量”的概念;
- 消费延迟:单条消息从生产到消费的时间差,是“时间”的概念;
堆积必然导致延迟,但延迟不一定是堆积(如消费逻辑慢但TPS匹配)。
获取堆积量代码:
package com.jam.demo.rocketmq.admin;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
/**
* Topic监控指标获取工具类
* @author ken
*/
@RestController
@RequestMapping("/rocketmq/topic")
@Tag(name = "Topic监控接口", description = "获取Topic的生产消费及堆积指标")
@Slf4j
public class TopicMonitorController {
private static final DefaultMQAdminExt admin = new DefaultMQAdminExt();
static {
admin.setNamesrvAddr("127.0.0.1:9876");
try {
admin.start();
} catch (MQClientException e) {
log.error("DefaultMQAdminExt启动失败", e);
throw new RuntimeException(e);
}
}
/**
* 获取Topic的消费堆积量
* @param topic Topic名称
* @param consumerGroup 消费组名称
* @return 堆积量信息
* @throws MQClientException MQ客户端异常
*/
@GetMapping("/accumulation/{topic}/{consumerGroup}")
@Operation(summary = "获取Topic堆积量", description = "根据Topic和消费组查询消息堆积数")
public String getTopicAccumulation(
@Parameter(description = "Topic名称", required = true) @PathVariable String topic,
@Parameter(description = "消费组名称", required = true) @PathVariable String consumerGroup
) throws MQClientException {
if (!StringUtils.hasText(topic)) {
throw new IllegalArgumentException("Topic名称不能为空");
}
if (!StringUtils.hasText(consumerGroup)) {
throw new IllegalArgumentException("消费组名称不能为空");
}
ConsumeStats consumeStats = admin.examineConsumeStats(consumerGroup);
if (ObjectUtils.isEmpty(consumeStats)) {
log.warn("消费组[{}]的消费状态为空", consumerGroup);
return "消费组[" + consumerGroup + "]无消费数据";
}
long totalAccumulation = consumeStats.getTotalAccumulation();
return String.format("Topic[%s]消费组[%s]堆积量:%d", topic, consumerGroup, totalAccumulation);
}
}
2.3 Consumer指标
Consumer是消息消费的末端,重点监控消费能力和异常处理:
- 消费进度(consumeOffset):需接近Broker的最大Offset(maxOffset),差距过大表示堆积;
- 重试次数(retryTimes):消息消费失败后的重试次数,超过16次进入死信队列(DLQ);
- 死信队列消息数(dlqMsgCount):需定期处理,否则占用存储。
三、监控工具实战(从内置到企业级)
3.1 RocketMQ内置Dashboard(轻量首选)
RocketMQ 5.x内置基于Spring Boot的Dashboard,无需额外部署:
3.1.1 配置步骤
- 修改Broker配置文件
broker.conf:
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
namesrvAddr=127.0.0.1:9876
brokerIP1=192.168.1.100 # Broker的实际IP
enableDashboard=true
- 启动NameServer和Broker:
nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -c conf/broker.conf &
- 访问Dashboard:
http://192.168.1.100:8080(默认端口8080),可查看集群状态、Topic堆积、消费进度等核心指标。
3.2 Prometheus+Grafana(企业级监控)
大规模集群需更强大的可视化和告警能力,推荐Prometheus+Grafana组合:
3.2.1 部署RocketMQ Exporter
git clone https://github.com/apache/rocketmq-exporter.git
cd rocketmq-exporter
mvn clean package -DskipTests
nohup java -jar target/rocketmq-exporter-0.2.0.jar \
--rocketmq.config.namesrvAddr=127.0.0.1:9876 \
--server.port=5557 &
3.2.2 配置Grafana面板
- 导入官方Dashboard(ID:10477),选择Prometheus数据源;
- 自定义关键指标面板:
- 生产TPS:
sum(rocketmq_broker_producer_tps) by (broker) - 消费TPS:
sum(rocketmq_broker_consumer_tps) by (broker) - 堆积量趋势:
sum(rocketmq_topic_accumulation) by (topic)
四、运维核心操作(部署→扩容→故障处理)
4.1 高可用集群部署(2主2从示例)
生产环境推荐部署2主2从集群,确保单Broker宕机不影响服务:
4.1.1 集群规划
- NameServer:2台(192.168.1.101/102);
- Broker Master1:192.168.1.103(broker-a, brokerId=0);
- Broker Slave1:192.168.1.104(broker-a, brokerId=1);
- Broker Master2:192.168.1.105(broker-b, brokerId=0);
- Broker Slave2:192.168.1.106(broker-b, brokerId=1)。
4.1.2 Master配置示例(broker-a.conf)
brokerClusterName=ProductionCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=72
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
namesrvAddr=192.168.1.101:9876;192.168.1.102:9876
storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
autoCreateTopicEnable=false # 生产环境关闭自动创建Topic
maxMessageSize=65536
4.1.3 Slave配置示例(broker-a-slave.conf)
brokerClusterName=ProductionCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=72
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
namesrvAddr=192.168.1.101:9876;192.168.1.102:9876
storePathRootDir=/data/rocketmq/store
masterAddr=192.168.1.103:10911 # Master1地址
4.1.4 启动集群
# 启动NameServer
nohup sh bin/mqnamesrv &
# 启动Master1
nohup sh bin/mqbroker -c conf/broker-a.conf &
# 启动Slave1
nohup sh bin/mqbroker -c conf/broker-a-slave.conf &
# 同理启动Master2和Slave2
4.2 集群扩容(新增Broker节点)
4.2.1 新增Master3+Slave3
- 配置
broker-c.conf(Master3)和broker-c-slave.conf(Slave3),参考4.1.2/3; - 启动新增Broker,自动注册到NameServer;
- 为Topic分配新Broker:
sh bin/mqadmin updateTopic -n 192.168.1.101:9876 -t OrderTopic -c ProductionCluster -b 192.168.1.107:10911
4.3 常见故障处理
4.3.1 Broker宕机(Master故障)
处理步骤:
- 检查Broker状态:
sh bin/mqadmin brokerStatus -n 192.168.1.101:9876 -b 192.168.1.103:10911
- 若为Master宕机,Slave会自动升级为临时Master(需配置
brokerRole=SYNC_MASTER); - 修复Master后,修改
brokerId=1作为Slave重启,接入新Master。
4.3.2 消息堆积(消费速度不足)
临时处理:重置消费进度(仅应急使用):
package com.jam.demo.rocketmq.admin;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.util.List;
/**
* 消费进度管理工具类
* @author ken
*/
@RestController
@RequestMapping("/rocketmq/consumer")
@Tag(name = "消费进度管理接口", description = "重置消费进度、处理消息堆积")
@Slf4j
public class ConsumerProgressController {
private static final DefaultMQAdminExt admin = new DefaultMQAdminExt();
static {
admin.setNamesrvAddr("127.0.0.1:9876");
try {
admin.start();
} catch (MQClientException e) {
log.error("DefaultMQAdminExt启动失败", e);
throw new RuntimeException(e);
}
}
/**
* 重置消费进度到指定时间点
* @param consumerGroup 消费组名称
* @param topic Topic名称
* @param timestamp 时间戳(毫秒)
* @return 操作结果
* @throws MQClientException MQ客户端异常
*/
@PostMapping("/resetOffsetByTime")
@Operation(summary = "重置消费进度", description = "将消费组的Topic消费进度重置到指定时间点")
public String resetOffsetByTime(
@Parameter(description = "消费组名称", required = true) @RequestParam String consumerGroup,
@Parameter(description = "Topic名称", required = true) @RequestParam String topic,
@Parameter(description = "目标时间戳(毫秒)", required = true) @RequestParam long timestamp
) throws MQClientException {
if (!StringUtils.hasText(consumerGroup)) {
throw new IllegalArgumentException("消费组名称不能为空");
}
if (!StringUtils.hasText(topic)) {
throw new IllegalArgumentException("Topic名称不能为空");
}
List<MessageQueue> mqs = admin.fetchSubscribeMessageQueues(topic);
if (CollectionUtils.isEmpty(mqs)) {
log.warn("Topic[{}]无消息队列", topic);
return "Topic[" + topic + "]无消息队列";
}
admin.resetOffsetByTimestamp(consumerGroup, topic, timestamp);
return String.format("消费组[%s]Topic[%s]消费进度已重置到时间戳[%d]", consumerGroup, topic, timestamp);
}
}
4.3.3 死信队列处理
查看死信消息:
sh bin/mqadmin viewMessage -n 192.168.1.101:9876 -t %DLQ%_OrderConsumerGroup -k orderId123
重新发送死信消息:
sh bin/mqadmin sendMessage -n 192.168.1.101:9876 -t OrderTopic -p "{\"orderId\":\"123\",\"status\":\"success\"}"
五、生产级运维最佳实践
5.1 监控告警策略
5.1.1 核心指标阈值
| 指标 | 阈值 | 告警级别 |
| Broker磁盘使用率 | >85% | 严重 |
| Topic堆积量 | >10000 | 警告 |
| 消费TPS < 生产TPS | 持续5分钟 | 警告 |
| Broker宕机 | 1分钟未恢复 | 严重 |
5.1.2 告警配置(AlertManager)
global:
resolve_timeout: 5m
route:
group_by: ['alertname']
group_wait: 10s
group_interval: 10s
repeat_interval: 1h
receiver: 'dingding'
receivers:
- name: 'dingding'
webhook_configs:
- url: 'http://dingding-webhook:8080/send' # 钉钉机器人Webhook
send_resolved: true
5.2 运维自动化脚本
5.2.1 Broker状态检查脚本(check_broker.sh)
#!/bin/bash
NAMESRV_ADDR="192.168.1.101:9876"
BROKER_LIST=("192.168.1.103:10911" "192.168.1.105:10911")
for broker in "${BROKER_LIST[@]}"; do
status=$(sh bin/mqadmin brokerStatus -n $NAMESRV_ADDR -b $broker | grep "BrokerStatus" | awk '{print $2}')
if [ "$status" != "RUNNING" ]; then
echo "Broker $broker状态异常:$status"
curl -X POST http://alertmanager:9093/api/v1/alerts -d '[{
"labels": {"alertname": "BrokerDown", "broker": "'$broker'"},
"annotations": {"description": "Broker '$broker'状态异常,当前状态:'$status'"}
}]'
fi
done
5.3 性能调优
5.3.1 Broker参数调优
sendMessageThreadPoolNums:发送线程池大小,建议CPU核心数×2;mapedFileSizeCommitLog:CommitLog文件大小,调整为2GB减少文件数;transientStorePoolEnable:启用临时存储池,提升刷盘性能(需足够内存)。
5.3.2 JVM调优(jvm.options)
-server
-Xms8g
-Xmx8g
-Xmn4g
-XX:MetaspaceSize=256m
-XX:MaxMetaspaceSize=256m
-XX:+UseG1GC
-XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=20
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/data/rocketmq/logs/heapdump.hprof
六、总结
RocketMQ的监控与运维核心在于“理解底层+标准化操作”:监控体系需覆盖数据流转全链路,运维操作需遵循高可用设计原则。通过搭建“指标监控+告警+自动化”的闭环体系,既能提前发现潜在问题,又能快速处理故障。本文从原理到实战,覆盖了监控指标、工具使用、运维操作和最佳实践,希望能帮助你在生产环境中让RocketMQ稳定运行——记住,消息中间件的稳定性,是业务系统高可用的基石。