RocketMQ集群部署与快速入门全解密:从原理到实战,万字干货吃透消息中间件

简介: 本文详解Apache RocketMQ核心概念、多Master多Slave集群部署及Java实战,涵盖NameServer、Broker配置、消息收发、事务消息与故障排查,助你掌握分布式消息系统搭建与应用。

引言

在分布式系统架构中,消息中间件是实现异步通信、解耦服务、削峰填谷的核心组件。Apache RocketMQ作为阿里开源的分布式消息队列,凭借高吞吐、低延迟、高可靠的特性,成为金融、电商、物流等领域的首选方案。但很多开发者对RocketMQ的集群部署原理模糊,入门实践踩坑不断。本文将从底层逻辑到实战操作,全方位解密RocketMQ集群部署与快速入门,让你既能夯实基础,又能解决实际生产问题。

一、RocketMQ核心概念拆解

要搞懂集群部署,先吃透核心组件和术语——这是理解RocketMQ运行逻辑的基石,所有设计都围绕这些组件展开(参考RocketMQ官方文档V5.1.4核心架构说明)。

1.1 核心组件

  • NameServer:RocketMQ的“通讯录”,负责管理Broker节点的注册与发现,保存Topic与Broker的映射关系。无状态设计,集群节点间不通信,客户端通过轮询NameServer获取最新Broker地址。
  • Broker:消息的“快递站”,负责消息的存储、发送和消费。Broker分为Master(主节点)和Slave(从节点),Master负责读写,Slave仅做备份和读,保证高可用。
  • Producer:消息生产者,负责发送消息到Broker,支持同步、异步、单向三种发送模式。
  • Consumer:消息消费者,从Broker拉取或推送消息消费,支持集群消费(同一Topic消息只被消费一次)和广播消费(所有消费者都收到消息)。

1.2 关键术语

  • Topic:消息的“分类标签”,生产者按Topic发消息,消费者按Topic订阅消息。
  • Queue:Topic的物理分区,每个Topic可划分为多个Queue,分布在不同Broker上,实现负载均衡和并行消费。
  • Message:消息载体,包含主题、标签、内容、属性等信息。
  • Offset:消息在Queue中的位置标识,消费者通过Offset记录消费进度。

二、RocketMQ集群架构深度解析

RocketMQ集群部署模式决定了系统的可用性和性能,官方推荐生产环境采用“多Master多Slave”架构(参考RocketMQ官方部署指南)。

2.1 集群模式分类

  1. 单Master模式:仅1个Master节点,简单但无高可用,适合测试环境。
  2. 多Master模式:多个Master节点,无Slave,性能高但单个Master故障会丢失数据。
  3. 多Master多Slave模式:每个Master配1个Slave,支持两种同步策略:
  • 同步双写(SYNC_MASTER):Master和Slave同步写入成功后才返回生产者,数据零丢失但性能略降。
  • 异步复制(ASYNC_MASTER):Master写入成功后立即返回,异步同步到Slave,性能高但极端情况可能丢数据。

2.2 集群架构图

image.png

(注:该架构为生产环境推荐的“3Master+3Slave同步双写”模式)

三、RocketMQ集群部署实战(Linux环境)

3.1 环境准备

  • 操作系统:CentOS 7.9(64位)
  • JDK版本:17.0.9(RocketMQ 5.x要求JDK 8+,推荐JDK17)
  • RocketMQ版本:5.1.4(最新稳定版)
  • 服务器规划:
节点 IP地址 角色
node1 192.168.1.10 NameServer/Broker-Master1
node2 192.168.1.11 NameServer/Broker-Master2
node3 192.168.1.12 NameServer/Broker-Master3
node4 192.168.1.13 Broker-Slave1
node5 192.168.1.14 Broker-Slave2
node6 192.168.1.15 Broker-Slave3

3.2 安装JDK17

# 下载JDK17
wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
# 解压
tar -zxvf jdk-17_linux-x64_bin.tar.gz -C /usr/local/
# 配置环境变量
echo "export JAVA_HOME=/usr/local/jdk-17.0.9" >> /etc/profile
echo "export PATH=\$PATH:\$JAVA_HOME/bin" >> /etc/profile
source /etc/profile
# 验证
java -version # 输出jdk-17.0.9即成功

3.3 安装RocketMQ

# 下载RocketMQ 5.1.4
wget https://archive.apache.org/dist/rocketmq/5.1.4/rocketmq-all-5.1.4-bin-release.zip
# 解压
unzip rocketmq-all-5.1.4-bin-release.zip -d /usr/local/
mv /usr/local/rocketmq-all-5.1.4-bin-release /usr/local/rocketmq
# 配置环境变量
echo "export ROCKETMQ_HOME=/usr/local/rocketmq" >> /etc/profile
echo "export PATH=\$PATH:\$ROCKETMQ_HOME/bin" >> /etc/profile
source /etc/profile

3.4 配置NameServer集群

NameServer集群无需额外配置,只需在每个节点启动NameServer即可:

# 在node1/node2/node3分别执行
nohup sh $ROCKETMQ_HOME/bin/mqnamesrv &
# 查看日志验证启动
tail -f ~/logs/rocketmqlogs/namesrv.log
# 日志出现"NameServer startup successfully"即成功

3.5 配置Broker集群

3.5.1 创建Broker配置文件

在node1(Master1)创建broker-m1.properties

# 集群名称

brokerClusterName=JamCluster

# Broker名称

brokerName=broker-m1

# Broker ID(Master为0,Slave为1+)

brokerId=0

# NameServer地址(多个用;分隔)

namesrvAddr=192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876

# 监听端口

listenPort=10911

# 存储路径

storePathRootDir=/usr/local/rocketmq/store/m1

storePathCommitLog=/usr/local/rocketmq/store/m1/commitlog

# 同步策略(同步双写)

brokerRole=SYNC_MASTER

flushDiskType=SYNC_FLUSH

# 自动创建Topic

autoCreateTopicEnable=true

在node4(Slave1)创建broker-s1.properties

brokerClusterName=JamCluster

brokerName=broker-m1

brokerId=1

namesrvAddr=192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876

listenPort=10912

storePathRootDir=/usr/local/rocketmq/store/s1

storePathCommitLog=/usr/local/rocketmq/store/s1/commitlog

brokerRole=SLAVE

flushDiskType=SYNC_FLUSH

同理,在node2/node5创建broker-m2.propertiesbroker-s2.properties,node3/node6创建broker-m3.propertiesbroker-s3.properties(仅修改brokerName、brokerId、端口和存储路径)。

3.5.2 启动Broker集群

# 在node1启动Master1
nohup sh $ROCKETMQ_HOME/bin/mqbroker -c /usr/local/rocketmq/conf/broker-m1.properties &
# 在node4启动Slave1
nohup sh $ROCKETMQ_HOME/bin/mqbroker -c /usr/local/rocketmq/conf/broker-s1.properties &
# 同理启动node2/node5、node3/node6的Broker
# 查看Broker日志验证
tail -f ~/logs/rocketmqlogs/broker.log
# 日志出现"register broker to name server successfully"即成功

3.6 集群验证

# 使用RocketMQ工具查看集群状态
sh $ROCKETMQ_HOME/bin/mqadmin clusterList -n 192.168.1.10:9876
# 输出包含所有Master和Slave节点信息即集群部署成功

四、RocketMQ快速入门:Java客户端实战

4.1 项目依赖配置(Maven)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

   <modelVersion>4.0.0</modelVersion>

   <groupId>com.jam.demo</groupId>
   <artifactId>rocketmq-demo</artifactId>
   <version>1.0.0</version>

   <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>3.2.0</version>
       <relativePath/>
   </parent>

   <dependencies>
       <!-- SpringBoot核心依赖 -->
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>

       <!-- RocketMQ客户端依赖 -->
       <dependency>
           <groupId>org.apache.rocketmq</groupId>
           <artifactId>rocketmq-client-java</artifactId>
           <version>5.1.4</version>
       </dependency>

       <!-- Lombok -->
       <dependency>
           <groupId>org.projectlombok</groupId>
           <artifactId>lombok</artifactId>
           <version>1.18.30</version>
           <scope>provided</scope>
       </dependency>

       <!-- Spring工具类 -->
       <dependency>
           <groupId>org.springframework</groupId>
           <artifactId>spring-context</artifactId>
           <version>6.1.1</version>
       </dependency>

       <!-- Fastjson2 -->
       <dependency>
           <groupId>com.alibaba.fastjson2</groupId>
           <artifactId>fastjson2</artifactId>
           <version>2.0.32</version>
       </dependency>

       <!-- Guava集合工具 -->
       <dependency>
           <groupId>com.google.guava</groupId>
           <artifactId>guava</artifactId>
           <version>32.1.3-jre</version>
       </dependency>

       <!-- Swagger3 -->
       <dependency>
           <groupId>org.springdoc</groupId>
           <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
           <version>2.2.0</version>
       </dependency>

       <!-- MyBatisPlus -->
       <dependency>
           <groupId>com.baomidou</groupId>
           <artifactId>mybatis-plus-boot-starter</artifactId>
           <version>3.5.5</version>
       </dependency>

       <!-- MySQL驱动 -->
       <dependency>
           <groupId>com.mysql</groupId>
           <artifactId>mysql-connector-j</artifactId>
           <version>8.0.33</version>
           <scope>runtime</scope>
       </dependency>
   </dependencies>

   <build>
       <plugins>
           <plugin>
               <groupId>org.apache.maven.plugins</groupId>
               <artifactId>maven-compiler-plugin</artifactId>
               <version>3.11.0</version>
               <configuration>
                   <source>17</source>
                   <target>17</target>
               </configuration>
           </plugin>
       </plugins>
   </build>
</project>

4.2 生产者实现(同步发送)

package com.jam.demo.producer;

import com.alibaba.fastjson2.JSON;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;

import javax.annotation.PostConstruct;
import java.util.Map;

/**
* RocketMQ生产者示例(同步发送)
* @author ken
*/

@RestController
@RequestMapping("/rocketmq/producer")
@Tag(name = "RocketMQ生产者接口", description = "消息发送相关接口")
@Slf4j
public class SyncProducerController {

   private DefaultMQProducer producer;

   /**
    * 初始化生产者
    */

   @PostConstruct
   public void initProducer() {
       // 创建生产者实例,指定生产者组
       producer = new DefaultMQProducer("jam-producer-group");
       // 设置NameServer地址
       producer.setNamesrvAddr("192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876");
       // 设置重试次数
       producer.setRetryTimesWhenSendFailed(3);
       try {
           // 启动生产者
           producer.start();
           log.info("RocketMQ生产者启动成功");
       } catch (Exception e) {
           log.error("RocketMQ生产者启动失败", e);
           throw new RuntimeException("生产者初始化失败", e);
       }
   }

   /**
    * 发送同步消息
    * @param messageBody 消息体
    * @return 发送结果
    */

   @PostMapping("/sendSync")
   @Operation(summary = "发送同步消息", description = "向指定Topic发送同步消息,确保消息送达")
   public Map<String, Object> sendSyncMessage(@RequestBody Map<String, Object> messageBody) {
       Map<String, Object> result = Maps.newHashMap();
       try {
           // 参数校验
           String topic = (String) messageBody.get("topic");
           String tags = (String) messageBody.get("tags");
           String content = (String) messageBody.get("content");
           StringUtils.hasText(topic, "Topic不能为空");
           StringUtils.hasText(content, "消息内容不能为空");

           // 构建消息对象
           Message message = new Message(
                   topic,
                   tags,
                   content.getBytes("UTF-8")
           );

           // 发送同步消息
           SendResult sendResult = producer.send(message);
           log.info("消息发送成功,结果:{}", JSON.toJSONString(sendResult));

           // 封装返回结果
           result.put("success", true);
           result.put("msgId", sendResult.getMsgId());
           result.put("queueId", sendResult.getMessageQueue().getQueueId());
           result.put("offset", sendResult.getQueueOffset());
       } catch (Exception e) {
           log.error("消息发送失败", e);
           result.put("success", false);
           result.put("errorMsg", e.getMessage());
       }
       return result;
   }

   /**
    * 销毁生产者(Spring容器关闭时执行)
    */

   @javax.annotation.PreDestroy
   public void destroyProducer()
{
       if (producer != null) {
           producer.shutdown();
           log.info("RocketMQ生产者已关闭");
       }
   }
}

4.3 消费者实现(集群消费)

package com.jam.demo.consumer;

import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;

import javax.annotation.PostConstruct;
import java.util.List;

/**
* RocketMQ消费者示例(集群消费)
* @author ken
*/

@RestController
@RequestMapping("/rocketmq/consumer")
@Tag(name = "RocketMQ消费者接口", description = "消息消费相关接口")
@Slf4j
public class ClusterConsumerController {

   private DefaultMQPushConsumer consumer;

   /**
    * 初始化消费者
    */

   @PostConstruct
   public void initConsumer() {
       try {
           // 创建消费者实例,指定消费者组
           consumer = new DefaultMQPushConsumer("jam-consumer-group");
           // 设置NameServer地址
           consumer.setNamesrvAddr("192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876");
           // 订阅Topic和Tag(*表示所有Tag)
           consumer.subscribe("jam-test-topic", "*");
           // 设置消费模式为集群模式(默认)
           consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
           // 设置消费线程数
           consumer.setConsumeThreadMin(20);
           consumer.setConsumeThreadMax(64);
           // 注册消息监听器
           consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
               if (ObjectUtils.isEmpty(msgs)) {
                   log.warn("消费消息为空");
                   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }
               for (MessageExt msg : msgs) {
                   try {
                       String content = new String(msg.getBody(), "UTF-8");
                       log.info("消费消息成功,msgId:{},内容:{}", msg.getMsgId(), content);
                       // 这里可添加业务处理逻辑(如写入数据库)
                   } catch (Exception e) {
                       log.error("消费消息失败", e);
                       // 重试消费(最多重试16次,超过则进入死信队列)
                       return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                   }
               }
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           });
           // 启动消费者
           consumer.start();
           log.info("RocketMQ消费者启动成功");
       } catch (Exception e) {
           log.error("RocketMQ消费者启动失败", e);
           throw new RuntimeException("消费者初始化失败", e);
       }
   }

   /**
    * 获取消费者状态
    * @return 状态信息
    */

   @GetMapping("/status")
   @Operation(summary = "获取消费者状态", description = "查询消费者是否正常运行")
   public String getConsumerStatus() {
       if (consumer != null && consumer.getDefaultMQPushConsumerImpl().isStarted()) {
           return "消费者运行正常";
       } else {
           return "消费者已停止";
       }
   }

   /**
    * 销毁消费者(Spring容器关闭时执行)
    */

   @javax.annotation.PreDestroy
   public void destroyConsumer()
{
       if (consumer != null) {
           consumer.shutdown();
           log.info("RocketMQ消费者已关闭");
       }
   }
}

4.4 启动类配置

package com.jam.demo;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;

/**
* RocketMQ示例项目启动类
* @author ken
*/

@SpringBootApplication
@MapperScan("com.jam.demo.mapper")
@OpenAPIDefinition(info = @Info(title = "RocketMQ Demo API", version = "1.0", description = "RocketMQ快速入门示例接口文档"))
public class RocketMqDemoApplication {

   public static void main(String[] args) {
       SpringApplication.run(RocketMqDemoApplication.class, args);
   }
}

4.5 测试验证

  1. 启动SpringBoot项目,访问http://localhost:8080/swagger-ui.html打开Swagger文档。
  2. 调用/rocketmq/producer/sendSync接口,传入参数:

{
   "topic": "jam-test-topic",
   "tags": "test",
   "content": "Hello RocketMQ!"
}

  1. 查看控制台日志,消费者成功打印“消费消息成功,msgId:xxx,内容:Hello RocketMQ!”即测试通过。

五、进阶应用:消息可靠性与事务消息

5.1 消息发送模式对比

模式 特点 适用场景
同步发送 阻塞等待结果,可靠性最高 重要业务(订单创建、支付通知)
异步发送 回调通知结果,性能较高 非核心但需确认送达的业务
单向发送 无需返回结果,性能最高 日志收集、埋点数据

异步发送示例

/**
* 发送异步消息
* @param messageBody 消息体
* @return 发送结果
*/

@PostMapping("/sendAsync")
@Operation(summary = "发送异步消息", description = "向指定Topic发送异步消息,通过回调获取结果")
public Map<String, Object> sendAsyncMessage(@RequestBody Map<String, Object> messageBody) {
   Map<String, Object> result = Maps.newHashMap();
   try {
       String topic = (String) messageBody.get("topic");
       String content = (String) messageBody.get("content");
       StringUtils.hasText(topic, "Topic不能为空");
       StringUtils.hasText(content, "消息内容不能为空");

       Message message = new Message(topic, content.getBytes("UTF-8"));
       // 异步发送,通过回调处理结果
       producer.send(message, (sendResult, e) -> {
           if (e != null) {
               log.error("异步消息发送失败", e);
           } else {
               log.info("异步消息发送成功,msgId:{}", sendResult.getMsgId());
           }
       });

       result.put("success", true);
       result.put("msg", "消息已异步发送,可查看日志获取结果");
   } catch (Exception e) {
       log.error("异步消息发送异常", e);
       result.put("success", false);
       result.put("errorMsg", e.getMessage());
   }
   return result;
}

5.2 事务消息实现(分布式事务)

RocketMQ事务消息通过“半消息+回查”机制保证分布式事务一致性(参考RocketMQ官方事务消息文档)。

5.2.1 事务生产者

package com.jam.demo.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;

import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.concurrent.*;

/**
* RocketMQ事务消息生产者
* @author ken
*/

@RestController
@RequestMapping("/rocketmq/transaction")
@Tag(name = "事务消息接口", description = "分布式事务消息发送接口")
@Slf4j
public class TransactionProducerController {

   private TransactionMQProducer producer;

   @PostConstruct
   public void initTransactionProducer() {
       producer = new TransactionMQProducer("jam-transaction-group");
       producer.setNamesrvAddr("192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876");
       // 设置事务监听器
       producer.setTransactionListener(new TransactionListener() {
           /**
            * 执行本地事务
            */

           @Override
           public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
               try {
                   String content = new String(msg.getBody(), "UTF-8");
                   log.info("执行本地事务,消息内容:{}", content);
                   // 模拟本地事务(如数据库操作)
                   boolean localTxSuccess = executeDbOperation(content);
                   if (localTxSuccess) {
                       return LocalTransactionState.COMMIT_MESSAGE; // 提交消息
                   } else {
                       return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚消息
                   }
               } catch (Exception e) {
                   log.error("本地事务执行失败", e);
                   return LocalTransactionState.UNKNOW; // 未知状态,等待回查
               }
           }

           /**
            * 事务回查(解决本地事务结果未知的情况)
            */

           @Override
           public LocalTransactionState checkLocalTransaction(MessageExt msg) {
               try {
                   String msgId = msg.getMsgId();
                   log.info("回查本地事务,msgId:{}", msgId);
                   // 检查本地事务状态(如查询数据库)
                   boolean txSuccess = checkDbOperation(msgId);
                   if (txSuccess) {
                       return LocalTransactionState.COMMIT_MESSAGE;
                   } else {
                       return LocalTransactionState.ROLLBACK_MESSAGE;
                   }
               } catch (Exception e) {
                   log.error("事务回查失败", e);
                   return LocalTransactionState.UNKNOW;
               }
           }
       });
       // 设置线程池处理事务
       producer.setExecutorService(new ThreadPoolExecutor(
               5,
               20,
               60,
               TimeUnit.SECONDS,
               new ArrayBlockingQueue<>(2000),
               Executors.defaultThreadFactory()
       ));
       try {
           producer.start();
           log.info("事务消息生产者启动成功");
       } catch (Exception e) {
           log.error("事务消息生产者启动失败", e);
           throw new RuntimeException("事务生产者初始化失败", e);
       }
   }

   /**
    * 发送事务消息
    * @param messageBody 消息体
    * @return 发送结果
    */

   @PostMapping("/send")
   @Operation(summary = "发送事务消息", description = "发送分布式事务消息,保证本地事务与消息发送一致性")
   public Map<String, Object> sendTransactionMessage(@RequestBody Map<String, Object> messageBody) {
       Map<String, Object> result = Maps.newHashMap();
       try {
           String topic = (String) messageBody.get("topic");
           String content = (String) messageBody.get("content");
           StringUtils.hasText(topic, "Topic不能为空");
           StringUtils.hasText(content, "消息内容不能为空");

           Message message = new Message(topic, content.getBytes("UTF-8"));
           // 发送事务消息(arg为自定义参数)
           producer.sendMessageInTransaction(message, null);

           result.put("success", true);
           result.put("msg", "事务消息已发送,等待本地事务执行结果");
       } catch (Exception e) {
           log.error("事务消息发送失败", e);
           result.put("success", false);
           result.put("errorMsg", e.getMessage());
       }
       return result;
   }

   /**
    * 模拟数据库操作(本地事务)
    */

   private boolean executeDbOperation(String content) {
       // 实际业务中可替换为MyBatisPlus操作数据库
       log.info("执行数据库操作,内容:{}", content);
       return true; // 模拟成功
   }

   /**
    * 模拟检查数据库事务状态
    */

   private boolean checkDbOperation(String msgId) {
       log.info("检查数据库事务状态,msgId:{}", msgId);
       return true; // 模拟事务成功
   }

   @PreDestroy
   public void destroyProducer() {
       if (producer != null) {
           producer.shutdown();
           log.info("事务消息生产者已关闭");
       }
   }
}

六、常见问题排查

6.1 NameServer启动失败

  • 原因:JDK版本不兼容或端口被占用(默认9876)。
  • 解决:使用JDK8+,执行netstat -tulpn | grep 9876查看端口占用并释放。

6.2 Broker无法注册到NameServer

  • 原因:namesrvAddr配置错误或防火墙拦截端口。
  • 解决:检查NameServer地址配置,开放9876和Broker端口(如10911)。

6.3 消息发送失败

  • 原因:Topic未创建或Broker不可用。
  • 解决:通过mqadmin updateTopic创建Topic,或检查Broker状态。

6.4 消息消费重复

  • 原因:消费者返回RECONSUME_LATER导致重试,或网络波动。
  • 解决:业务逻辑实现幂等性(如通过msgId去重)。

七、总结

本文从RocketMQ核心概念出发,深入解析集群架构原理,手把手完成多Master多Slave集群部署,并通过Java代码实现生产者、消费者及事务消息的开发。RocketMQ的高可用依赖合理的集群架构,而消息可靠性则需要结合业务场景选择合适的发送模式和消费策略。掌握这些知识后,你不仅能快速搭建RocketMQ集群,还能解决生产环境中的常见问题,让消息中间件真正成为分布式系统的“稳定基石”。

目录
相关文章
|
4月前
|
消息中间件 存储 Java
企业实战RocketMQ:从API到架构开发的深度解析与落地实践
本文全面介绍了Apache RocketMQ消息中间件的核心技术与实战应用。首先解析了RocketMQ的四大核心组件(NameServer、Broker、Producer、Consumer)及其底层逻辑,包括路由发现机制和三层存储结构。接着详细演示了环境搭建、API开发(普通/顺序/批量/事务消息)、企业级架构设计(高可用集群、消息可靠性保障)和幂等性处理方案。最后提供了常见问题排查方法和性能优化建议,涵盖Broker配置、生产消费优化等关键点。所有示例代码均经过生产验证,可直接应用于实际项目开发。
307 2
|
消息中间件 Java Maven
一文搞懂Spring Boot整合RocketMQ
一文搞懂Spring Boot整合RocketMQ
588 0
|
消息中间件 数据可视化 Java
Linxu下RocketMq及可视化界面的搭建
Linxu下RocketMq配置信息及可视化界面的搭建
2321 0
|
4月前
|
消息中间件 存储 运维
RocketMQ 深度解剖:模块划分与集群原理的硬核解析
本文深入解析Apache RocketMQ的核心模块与集群原理,涵盖NameServer路由机制、Broker存储结构、Producer负载均衡及Consumer消费模式,结合实战案例与性能优化策略,全面掌握其在分布式系统中的高可用架构设计与应用实践。
371 5
|
4月前
|
消息中间件 存储 运维
RocketMQ监控与运维实战:从底层原理到生产落地全解析
本文深入解析RocketMQ监控与运维体系,涵盖核心架构、关键指标、实战工具及生产最佳实践,助你构建高可用消息系统。
360 4
|
8月前
|
消息中间件 缓存 监控
MQ消息积压 / Rocketmq 积压 最全的处理方案。 (秒懂+图解+史上最全)
MQ消息积压 / Rocketmq 积压 最全的处理方案。 (秒懂+图解+史上最全)
MQ消息积压 / Rocketmq 积压 最全的处理方案。 (秒懂+图解+史上最全)
|
4月前
|
JSON 安全 JavaScript
深入浅出解析 HTTPS 原理
HTTPS是HTTP与SSL/TLS结合的安全协议,通过数字证书验证身份,利用非对称加密安全交换会话密钥,再以对称加密高效传输数据,确保通信的机密性、完整性和真实性。整个过程如同建立一条加密隧道,保障网络交互安全。
2282 16
|
11月前
|
消息中间件 Java 中间件
RocketMQ实战—2.RocketMQ集群生产部署
本文主要介绍了大纲什么是消息中间件、消息中间件的技术选型、RocketMQ的架构原理和使用方式、消息中间件路由中心的架构原理、Broker的主从架构原理、高可用的消息中间件生产部署架构、部署一个小规模的RocketMQ集群进行压测、如何对RocketMQ集群进行可视化的监控和管理、进行OS内核参数和JVM参数的调整、如何对小规模RocketMQ集群进行压测、消息中间件集群生产部署规划梳理。
RocketMQ实战—2.RocketMQ集群生产部署