引言
在分布式系统架构中,消息中间件是实现异步通信、解耦服务、削峰填谷的核心组件。Apache RocketMQ作为阿里开源的分布式消息队列,凭借高吞吐、低延迟、高可靠的特性,成为金融、电商、物流等领域的首选方案。但很多开发者对RocketMQ的集群部署原理模糊,入门实践踩坑不断。本文将从底层逻辑到实战操作,全方位解密RocketMQ集群部署与快速入门,让你既能夯实基础,又能解决实际生产问题。
一、RocketMQ核心概念拆解
要搞懂集群部署,先吃透核心组件和术语——这是理解RocketMQ运行逻辑的基石,所有设计都围绕这些组件展开(参考RocketMQ官方文档V5.1.4核心架构说明)。
1.1 核心组件
- NameServer:RocketMQ的“通讯录”,负责管理Broker节点的注册与发现,保存Topic与Broker的映射关系。无状态设计,集群节点间不通信,客户端通过轮询NameServer获取最新Broker地址。
- Broker:消息的“快递站”,负责消息的存储、发送和消费。Broker分为Master(主节点)和Slave(从节点),Master负责读写,Slave仅做备份和读,保证高可用。
- Producer:消息生产者,负责发送消息到Broker,支持同步、异步、单向三种发送模式。
- Consumer:消息消费者,从Broker拉取或推送消息消费,支持集群消费(同一Topic消息只被消费一次)和广播消费(所有消费者都收到消息)。
1.2 关键术语
- Topic:消息的“分类标签”,生产者按Topic发消息,消费者按Topic订阅消息。
- Queue:Topic的物理分区,每个Topic可划分为多个Queue,分布在不同Broker上,实现负载均衡和并行消费。
- Message:消息载体,包含主题、标签、内容、属性等信息。
- Offset:消息在Queue中的位置标识,消费者通过Offset记录消费进度。
二、RocketMQ集群架构深度解析
RocketMQ集群部署模式决定了系统的可用性和性能,官方推荐生产环境采用“多Master多Slave”架构(参考RocketMQ官方部署指南)。
2.1 集群模式分类
- 单Master模式:仅1个Master节点,简单但无高可用,适合测试环境。
- 多Master模式:多个Master节点,无Slave,性能高但单个Master故障会丢失数据。
- 多Master多Slave模式:每个Master配1个Slave,支持两种同步策略:
- 同步双写(SYNC_MASTER):Master和Slave同步写入成功后才返回生产者,数据零丢失但性能略降。
- 异步复制(ASYNC_MASTER):Master写入成功后立即返回,异步同步到Slave,性能高但极端情况可能丢数据。
2.2 集群架构图
(注:该架构为生产环境推荐的“3Master+3Slave同步双写”模式)
三、RocketMQ集群部署实战(Linux环境)
3.1 环境准备
- 操作系统:CentOS 7.9(64位)
- JDK版本:17.0.9(RocketMQ 5.x要求JDK 8+,推荐JDK17)
- RocketMQ版本:5.1.4(最新稳定版)
- 服务器规划:
| 节点 | IP地址 | 角色 |
| node1 | 192.168.1.10 | NameServer/Broker-Master1 |
| node2 | 192.168.1.11 | NameServer/Broker-Master2 |
| node3 | 192.168.1.12 | NameServer/Broker-Master3 |
| node4 | 192.168.1.13 | Broker-Slave1 |
| node5 | 192.168.1.14 | Broker-Slave2 |
| node6 | 192.168.1.15 | Broker-Slave3 |
3.2 安装JDK17
# 下载JDK17
wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
# 解压
tar -zxvf jdk-17_linux-x64_bin.tar.gz -C /usr/local/
# 配置环境变量
echo "export JAVA_HOME=/usr/local/jdk-17.0.9" >> /etc/profile
echo "export PATH=\$PATH:\$JAVA_HOME/bin" >> /etc/profile
source /etc/profile
# 验证
java -version # 输出jdk-17.0.9即成功
3.3 安装RocketMQ
# 下载RocketMQ 5.1.4
wget https://archive.apache.org/dist/rocketmq/5.1.4/rocketmq-all-5.1.4-bin-release.zip
# 解压
unzip rocketmq-all-5.1.4-bin-release.zip -d /usr/local/
mv /usr/local/rocketmq-all-5.1.4-bin-release /usr/local/rocketmq
# 配置环境变量
echo "export ROCKETMQ_HOME=/usr/local/rocketmq" >> /etc/profile
echo "export PATH=\$PATH:\$ROCKETMQ_HOME/bin" >> /etc/profile
source /etc/profile
3.4 配置NameServer集群
NameServer集群无需额外配置,只需在每个节点启动NameServer即可:
# 在node1/node2/node3分别执行
nohup sh $ROCKETMQ_HOME/bin/mqnamesrv &
# 查看日志验证启动
tail -f ~/logs/rocketmqlogs/namesrv.log
# 日志出现"NameServer startup successfully"即成功
3.5 配置Broker集群
3.5.1 创建Broker配置文件
在node1(Master1)创建broker-m1.properties:
# 集群名称
brokerClusterName=JamCluster
# Broker名称
brokerName=broker-m1
# Broker ID(Master为0,Slave为1+)
brokerId=0
# NameServer地址(多个用;分隔)
namesrvAddr=192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876
# 监听端口
listenPort=10911
# 存储路径
storePathRootDir=/usr/local/rocketmq/store/m1
storePathCommitLog=/usr/local/rocketmq/store/m1/commitlog
# 同步策略(同步双写)
brokerRole=SYNC_MASTER
flushDiskType=SYNC_FLUSH
# 自动创建Topic
autoCreateTopicEnable=true
在node4(Slave1)创建broker-s1.properties:
brokerClusterName=JamCluster
brokerName=broker-m1
brokerId=1
namesrvAddr=192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876
listenPort=10912
storePathRootDir=/usr/local/rocketmq/store/s1
storePathCommitLog=/usr/local/rocketmq/store/s1/commitlog
brokerRole=SLAVE
flushDiskType=SYNC_FLUSH
同理,在node2/node5创建broker-m2.properties和broker-s2.properties,node3/node6创建broker-m3.properties和broker-s3.properties(仅修改brokerName、brokerId、端口和存储路径)。
3.5.2 启动Broker集群
# 在node1启动Master1
nohup sh $ROCKETMQ_HOME/bin/mqbroker -c /usr/local/rocketmq/conf/broker-m1.properties &
# 在node4启动Slave1
nohup sh $ROCKETMQ_HOME/bin/mqbroker -c /usr/local/rocketmq/conf/broker-s1.properties &
# 同理启动node2/node5、node3/node6的Broker
# 查看Broker日志验证
tail -f ~/logs/rocketmqlogs/broker.log
# 日志出现"register broker to name server successfully"即成功
3.6 集群验证
# 使用RocketMQ工具查看集群状态
sh $ROCKETMQ_HOME/bin/mqadmin clusterList -n 192.168.1.10:9876
# 输出包含所有Master和Slave节点信息即集群部署成功
四、RocketMQ快速入门:Java客户端实战
4.1 项目依赖配置(Maven)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.jam.demo</groupId>
<artifactId>rocketmq-demo</artifactId>
<version>1.0.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/>
</parent>
<dependencies>
<!-- SpringBoot核心依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- RocketMQ客户端依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.1.4</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<!-- Spring工具类 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>6.1.1</version>
</dependency>
<!-- Fastjson2 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.32</version>
</dependency>
<!-- Guava集合工具 -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.3-jre</version>
</dependency>
<!-- Swagger3 -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>2.2.0</version>
</dependency>
<!-- MyBatisPlus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.5</version>
</dependency>
<!-- MySQL驱动 -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.0.33</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
4.2 生产者实现(同步发送)
package com.jam.demo.producer;
import com.alibaba.fastjson2.JSON;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import javax.annotation.PostConstruct;
import java.util.Map;
/**
* RocketMQ生产者示例(同步发送)
* @author ken
*/
@RestController
@RequestMapping("/rocketmq/producer")
@Tag(name = "RocketMQ生产者接口", description = "消息发送相关接口")
@Slf4j
public class SyncProducerController {
private DefaultMQProducer producer;
/**
* 初始化生产者
*/
@PostConstruct
public void initProducer() {
// 创建生产者实例,指定生产者组
producer = new DefaultMQProducer("jam-producer-group");
// 设置NameServer地址
producer.setNamesrvAddr("192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876");
// 设置重试次数
producer.setRetryTimesWhenSendFailed(3);
try {
// 启动生产者
producer.start();
log.info("RocketMQ生产者启动成功");
} catch (Exception e) {
log.error("RocketMQ生产者启动失败", e);
throw new RuntimeException("生产者初始化失败", e);
}
}
/**
* 发送同步消息
* @param messageBody 消息体
* @return 发送结果
*/
@PostMapping("/sendSync")
@Operation(summary = "发送同步消息", description = "向指定Topic发送同步消息,确保消息送达")
public Map<String, Object> sendSyncMessage(@RequestBody Map<String, Object> messageBody) {
Map<String, Object> result = Maps.newHashMap();
try {
// 参数校验
String topic = (String) messageBody.get("topic");
String tags = (String) messageBody.get("tags");
String content = (String) messageBody.get("content");
StringUtils.hasText(topic, "Topic不能为空");
StringUtils.hasText(content, "消息内容不能为空");
// 构建消息对象
Message message = new Message(
topic,
tags,
content.getBytes("UTF-8")
);
// 发送同步消息
SendResult sendResult = producer.send(message);
log.info("消息发送成功,结果:{}", JSON.toJSONString(sendResult));
// 封装返回结果
result.put("success", true);
result.put("msgId", sendResult.getMsgId());
result.put("queueId", sendResult.getMessageQueue().getQueueId());
result.put("offset", sendResult.getQueueOffset());
} catch (Exception e) {
log.error("消息发送失败", e);
result.put("success", false);
result.put("errorMsg", e.getMessage());
}
return result;
}
/**
* 销毁生产者(Spring容器关闭时执行)
*/
@javax.annotation.PreDestroy
public void destroyProducer() {
if (producer != null) {
producer.shutdown();
log.info("RocketMQ生产者已关闭");
}
}
}
4.3 消费者实现(集群消费)
package com.jam.demo.consumer;
import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import javax.annotation.PostConstruct;
import java.util.List;
/**
* RocketMQ消费者示例(集群消费)
* @author ken
*/
@RestController
@RequestMapping("/rocketmq/consumer")
@Tag(name = "RocketMQ消费者接口", description = "消息消费相关接口")
@Slf4j
public class ClusterConsumerController {
private DefaultMQPushConsumer consumer;
/**
* 初始化消费者
*/
@PostConstruct
public void initConsumer() {
try {
// 创建消费者实例,指定消费者组
consumer = new DefaultMQPushConsumer("jam-consumer-group");
// 设置NameServer地址
consumer.setNamesrvAddr("192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876");
// 订阅Topic和Tag(*表示所有Tag)
consumer.subscribe("jam-test-topic", "*");
// 设置消费模式为集群模式(默认)
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
// 设置消费线程数
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
if (ObjectUtils.isEmpty(msgs)) {
log.warn("消费消息为空");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt msg : msgs) {
try {
String content = new String(msg.getBody(), "UTF-8");
log.info("消费消息成功,msgId:{},内容:{}", msg.getMsgId(), content);
// 这里可添加业务处理逻辑(如写入数据库)
} catch (Exception e) {
log.error("消费消息失败", e);
// 重试消费(最多重试16次,超过则进入死信队列)
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者
consumer.start();
log.info("RocketMQ消费者启动成功");
} catch (Exception e) {
log.error("RocketMQ消费者启动失败", e);
throw new RuntimeException("消费者初始化失败", e);
}
}
/**
* 获取消费者状态
* @return 状态信息
*/
@GetMapping("/status")
@Operation(summary = "获取消费者状态", description = "查询消费者是否正常运行")
public String getConsumerStatus() {
if (consumer != null && consumer.getDefaultMQPushConsumerImpl().isStarted()) {
return "消费者运行正常";
} else {
return "消费者已停止";
}
}
/**
* 销毁消费者(Spring容器关闭时执行)
*/
@javax.annotation.PreDestroy
public void destroyConsumer() {
if (consumer != null) {
consumer.shutdown();
log.info("RocketMQ消费者已关闭");
}
}
}
4.4 启动类配置
package com.jam.demo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;
/**
* RocketMQ示例项目启动类
* @author ken
*/
@SpringBootApplication
@MapperScan("com.jam.demo.mapper")
@OpenAPIDefinition(info = @Info(title = "RocketMQ Demo API", version = "1.0", description = "RocketMQ快速入门示例接口文档"))
public class RocketMqDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMqDemoApplication.class, args);
}
}
4.5 测试验证
- 启动SpringBoot项目,访问
http://localhost:8080/swagger-ui.html打开Swagger文档。 - 调用
/rocketmq/producer/sendSync接口,传入参数:
{
"topic": "jam-test-topic",
"tags": "test",
"content": "Hello RocketMQ!"
}
- 查看控制台日志,消费者成功打印“消费消息成功,msgId:xxx,内容:Hello RocketMQ!”即测试通过。
五、进阶应用:消息可靠性与事务消息
5.1 消息发送模式对比
| 模式 | 特点 | 适用场景 |
| 同步发送 | 阻塞等待结果,可靠性最高 | 重要业务(订单创建、支付通知) |
| 异步发送 | 回调通知结果,性能较高 | 非核心但需确认送达的业务 |
| 单向发送 | 无需返回结果,性能最高 | 日志收集、埋点数据 |
异步发送示例:
/**
* 发送异步消息
* @param messageBody 消息体
* @return 发送结果
*/
@PostMapping("/sendAsync")
@Operation(summary = "发送异步消息", description = "向指定Topic发送异步消息,通过回调获取结果")
public Map<String, Object> sendAsyncMessage(@RequestBody Map<String, Object> messageBody) {
Map<String, Object> result = Maps.newHashMap();
try {
String topic = (String) messageBody.get("topic");
String content = (String) messageBody.get("content");
StringUtils.hasText(topic, "Topic不能为空");
StringUtils.hasText(content, "消息内容不能为空");
Message message = new Message(topic, content.getBytes("UTF-8"));
// 异步发送,通过回调处理结果
producer.send(message, (sendResult, e) -> {
if (e != null) {
log.error("异步消息发送失败", e);
} else {
log.info("异步消息发送成功,msgId:{}", sendResult.getMsgId());
}
});
result.put("success", true);
result.put("msg", "消息已异步发送,可查看日志获取结果");
} catch (Exception e) {
log.error("异步消息发送异常", e);
result.put("success", false);
result.put("errorMsg", e.getMessage());
}
return result;
}
5.2 事务消息实现(分布式事务)
RocketMQ事务消息通过“半消息+回查”机制保证分布式事务一致性(参考RocketMQ官方事务消息文档)。
5.2.1 事务生产者
package com.jam.demo.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.concurrent.*;
/**
* RocketMQ事务消息生产者
* @author ken
*/
@RestController
@RequestMapping("/rocketmq/transaction")
@Tag(name = "事务消息接口", description = "分布式事务消息发送接口")
@Slf4j
public class TransactionProducerController {
private TransactionMQProducer producer;
@PostConstruct
public void initTransactionProducer() {
producer = new TransactionMQProducer("jam-transaction-group");
producer.setNamesrvAddr("192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876");
// 设置事务监听器
producer.setTransactionListener(new TransactionListener() {
/**
* 执行本地事务
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
String content = new String(msg.getBody(), "UTF-8");
log.info("执行本地事务,消息内容:{}", content);
// 模拟本地事务(如数据库操作)
boolean localTxSuccess = executeDbOperation(content);
if (localTxSuccess) {
return LocalTransactionState.COMMIT_MESSAGE; // 提交消息
} else {
return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚消息
}
} catch (Exception e) {
log.error("本地事务执行失败", e);
return LocalTransactionState.UNKNOW; // 未知状态,等待回查
}
}
/**
* 事务回查(解决本地事务结果未知的情况)
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
try {
String msgId = msg.getMsgId();
log.info("回查本地事务,msgId:{}", msgId);
// 检查本地事务状态(如查询数据库)
boolean txSuccess = checkDbOperation(msgId);
if (txSuccess) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
log.error("事务回查失败", e);
return LocalTransactionState.UNKNOW;
}
}
});
// 设置线程池处理事务
producer.setExecutorService(new ThreadPoolExecutor(
5,
20,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000),
Executors.defaultThreadFactory()
));
try {
producer.start();
log.info("事务消息生产者启动成功");
} catch (Exception e) {
log.error("事务消息生产者启动失败", e);
throw new RuntimeException("事务生产者初始化失败", e);
}
}
/**
* 发送事务消息
* @param messageBody 消息体
* @return 发送结果
*/
@PostMapping("/send")
@Operation(summary = "发送事务消息", description = "发送分布式事务消息,保证本地事务与消息发送一致性")
public Map<String, Object> sendTransactionMessage(@RequestBody Map<String, Object> messageBody) {
Map<String, Object> result = Maps.newHashMap();
try {
String topic = (String) messageBody.get("topic");
String content = (String) messageBody.get("content");
StringUtils.hasText(topic, "Topic不能为空");
StringUtils.hasText(content, "消息内容不能为空");
Message message = new Message(topic, content.getBytes("UTF-8"));
// 发送事务消息(arg为自定义参数)
producer.sendMessageInTransaction(message, null);
result.put("success", true);
result.put("msg", "事务消息已发送,等待本地事务执行结果");
} catch (Exception e) {
log.error("事务消息发送失败", e);
result.put("success", false);
result.put("errorMsg", e.getMessage());
}
return result;
}
/**
* 模拟数据库操作(本地事务)
*/
private boolean executeDbOperation(String content) {
// 实际业务中可替换为MyBatisPlus操作数据库
log.info("执行数据库操作,内容:{}", content);
return true; // 模拟成功
}
/**
* 模拟检查数据库事务状态
*/
private boolean checkDbOperation(String msgId) {
log.info("检查数据库事务状态,msgId:{}", msgId);
return true; // 模拟事务成功
}
@PreDestroy
public void destroyProducer() {
if (producer != null) {
producer.shutdown();
log.info("事务消息生产者已关闭");
}
}
}
六、常见问题排查
6.1 NameServer启动失败
- 原因:JDK版本不兼容或端口被占用(默认9876)。
- 解决:使用JDK8+,执行
netstat -tulpn | grep 9876查看端口占用并释放。
6.2 Broker无法注册到NameServer
- 原因:namesrvAddr配置错误或防火墙拦截端口。
- 解决:检查NameServer地址配置,开放9876和Broker端口(如10911)。
6.3 消息发送失败
- 原因:Topic未创建或Broker不可用。
- 解决:通过
mqadmin updateTopic创建Topic,或检查Broker状态。
6.4 消息消费重复
- 原因:消费者返回RECONSUME_LATER导致重试,或网络波动。
- 解决:业务逻辑实现幂等性(如通过msgId去重)。
七、总结
本文从RocketMQ核心概念出发,深入解析集群架构原理,手把手完成多Master多Slave集群部署,并通过Java代码实现生产者、消费者及事务消息的开发。RocketMQ的高可用依赖合理的集群架构,而消息可靠性则需要结合业务场景选择合适的发送模式和消费策略。掌握这些知识后,你不仅能快速搭建RocketMQ集群,还能解决生产环境中的常见问题,让消息中间件真正成为分布式系统的“稳定基石”。