在分布式系统架构中,消息中间件是实现异步通信、解耦服务、削峰填谷的核心组件。Apache RocketMQ凭借其高吞吐、低延迟、高可用的特性,成为阿里系及众多企业的首选消息中间件。本文将从RocketMQ的核心底层逻辑出发,结合企业级实战场景,全面讲解API开发、架构设计、问题排查与优化,让你既能夯实基础,又能直接落地生产。
一、RocketMQ核心概念与底层逻辑
1. 核心组件与角色分工
RocketMQ的架构由四大核心组件构成,各司其职:
- NameServer:轻量级路由中心,存储Topic与Broker的映射关系,无状态设计支持集群扩展。
- Broker:消息存储与转发核心,负责接收、存储、投递消息,支持主从架构保证高可用。
- Producer:消息生产者,负责创建并发送消息到Broker,支持集群部署。
- Consumer:消息消费者,从Broker拉取或接收消息并处理,支持推/拉两种消费模式。
辅助概念:
- Topic:消息主题,逻辑上的消息分类,生产者发送消息到指定Topic,消费者订阅Topic消费。
- MessageQueue:Topic的物理分区,每个Topic可划分为多个MessageQueue,实现负载均衡和顺序消费。
- Offset:消息在MessageQueue中的偏移量,标记消费进度。
2. 底层核心逻辑
(1)路由发现机制
Producer发送消息前需获取Topic的路由信息(即该Topic分布在哪些Broker的哪些MessageQueue),流程如下:
(2)消息存储机制
Broker采用CommitLog+ConsumeQueue+IndexFile的三层存储结构:
- CommitLog:所有Topic的消息混合存储在一个日志文件中,顺序写入保证性能。
- ConsumeQueue:Topic的消息索引文件,记录消息在CommitLog中的偏移量、大小等,加速消费查找。
- IndexFile:基于哈希索引的消息查询文件,支持按Key快速查询消息。
(3)消费模式
- 推模式(Push):Broker主动推送消息给Consumer,实时性高,Consumer需设置监听器处理消息。
- 拉模式(Pull):Consumer主动从Broker拉取消息,可控性强,适合批量消费场景。
二、RocketMQ环境搭建
1. 服务端安装(Linux环境)
(1)下载并解压
# 下载最新稳定版(5.1.4)
wget https://archive.apache.org/dist/rocketmq/5.1.4/rocketmq-all-5.1.4-bin-release.zip
unzip rocketmq-all-5.1.4-bin-release.zip -d /usr/local/rocketmq
cd /usr/local/rocketmq
(2)配置环境变量
echo "export ROCKETMQ_HOME=/usr/local/rocketmq" >> /etc/profile
echo "export PATH=\$PATH:\$ROCKETMQ_HOME/bin" >> /etc/profile
source /etc/profile
(3)启动NameServer
# 修改JVM内存(根据服务器配置调整)
sed -i 's/-Xms4g -Xmx4g/-Xms1g -Xmx1g/g' bin/runserver.sh
# 启动NameServer(后台运行)
nohup sh bin/mqnamesrv > namesrv.log 2>&1 &
# 验证启动(输出"Name Server boot success"表示成功)
tail -f namesrv.log
(4)启动Broker
# 修改JVM内存
sed -i 's/-Xms8g -Xmx8g -Xmn4g/-Xms2g -Xmx2g -Xmn1g/g' bin/runbroker.sh
# 启动Broker(指定NameServer地址)
nohup sh bin/mqbroker -n 192.168.1.100:9876 > broker.log 2>&1 &
# 验证启动(输出"broker boot success"表示成功)
tail -f broker.log
2. 客户端依赖配置
SpringBoot项目中引入以下Maven依赖(最新稳定版):
<dependencies>
<!-- SpringBoot核心依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.2.5</version>
</dependency>
<!-- RocketMQ客户端 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.1.4</version>
</dependency>
<!-- Lombok(简化代码) -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<!-- Swagger3(接口文档) -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>2.2.0</version>
</dependency>
<!-- MyBatisPlus(持久层) -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.5</version>
</dependency>
<!-- MySQL驱动 -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.0.33</version>
<scope>runtime</scope>
</dependency>
<!-- Fastjson2(JSON处理) -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.48</version>
</dependency>
<!-- Guava(集合工具) -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.2.0-jre</version>
</dependency>
</dependencies>
三、RocketMQ核心API实战
1. 普通消息生产与消费
(1)生产者配置类
package com.jam.demo.config;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import lombok.extern.slf4j.Slf4j;
/**
* RocketMQ生产者配置类
* @author ken
*/
@Configuration
@Slf4j
public class RocketMQProducerConfig {
@Value("${rocketmq.producer.group}")
private String producerGroup;
@Value("${rocketmq.name-server}")
private String nameServerAddr;
/**
* 初始化默认生产者
* @return DefaultMQProducer
*/
@Bean
public DefaultMQProducer defaultMQProducer() {
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(nameServerAddr);
// 设置同步发送重试次数
producer.setRetryTimesWhenSendFailed(3);
try {
producer.start();
log.info("RocketMQ生产者启动成功,nameServerAddr:{},producerGroup:{}", nameServerAddr, producerGroup);
} catch (Exception e) {
log.error("RocketMQ生产者启动失败", e);
throw new RuntimeException("RocketMQ生产者初始化失败", e);
}
return producer;
}
}
(2)普通消息生产者服务
package com.jam.demo.service;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
/**
* 普通消息生产者服务
* @author ken
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class NormalMessageProducerService {
private final DefaultMQProducer defaultMQProducer;
/**
* 发送普通消息(同步)
* @param topic 主题(必填)
* @param tags 标签(可选)
* @param keys 消息键(可选,用于消息查询)
* @param body 消息体(必填)
* @return SendResult 发送结果
* @throws Exception 发送异常
*/
public SendResult sendNormalMessage(String topic, String tags, String keys, String body) throws Exception {
// 参数校验
if (!StringUtils.hasText(topic)) {
throw new IllegalArgumentException("topic不能为空");
}
if (!StringUtils.hasText(body)) {
throw new IllegalArgumentException("body不能为空");
}
// 构建消息(topic+tags+keys+body)
Message message = new Message(topic, tags, keys, body.getBytes("UTF-8"));
// 同步发送消息
SendResult sendResult = defaultMQProducer.send(message);
log.info("发送普通消息成功,topic:{},tags:{},keys:{},msgId:{},queueId:{}",
topic, tags, keys, sendResult.getMsgId(), sendResult.getMessageQueue().getQueueId());
return sendResult;
}
/**
* 发送异步消息
* @param topic 主题
* @param tags 标签
* @param keys 消息键
* @param body 消息体
*/
public void sendAsyncMessage(String topic, String tags, String keys, String body) {
if (!StringUtils.hasText(topic) || !StringUtils.hasText(body)) {
throw new IllegalArgumentException("topic和body不能为空");
}
Message message = new Message(topic, tags, keys, body.getBytes());
// 异步发送回调
defaultMQProducer.send(message, (sendResult, e) -> {
if (e == null) {
log.info("异步发送成功,msgId:{}", sendResult.getMsgId());
} else {
log.error("异步发送失败", e);
}
});
}
}
(3)普通消息消费者服务
package com.jam.demo.service;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.PostConstruct;
import java.util.List;
/**
* 普通消息消费者服务(推模式)
* @author ken
*/
@Service
@Slf4j
public class NormalMessageConsumerService {
@Value("${rocketmq.consumer.group}")
private String consumerGroup;
@Value("${rocketmq.name-server}")
private String nameServerAddr;
/**
* 初始化推模式消费者
* @throws MQClientException 初始化异常
*/
@PostConstruct
public void initPushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(nameServerAddr);
// 设置从最新位置开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 设置最大重试次数
consumer.setMaxReconsumeTimes(5);
// 设置消费线程数
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);
// 订阅主题(*表示所有标签)
consumer.subscribe("demo_topic", "order");
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
for (MessageExt msg : msgs) {
String body = new String(msg.getBody(), "UTF-8");
log.info("消费普通消息成功,topic:{},tags:{},keys:{},body:{},msgId:{},reconsumeTimes:{}",
msg.getTopic(), msg.getTags(), msg.getKeys(), body, msg.getMsgId(), msg.getReconsumeTimes());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("消费普通消息失败", e);
// 重试消费(达到最大次数后进入死信队列)
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
// 启动消费者
consumer.start();
log.info("RocketMQ推模式消费者启动成功,consumerGroup:{}", consumerGroup);
}
}
(4)测试接口
package com.jam.demo.controller;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* 消息测试控制器
* @author ken
*/
@RestController
@RequestMapping("/message")
@Tag(name = "消息测试接口", description = "RocketMQ消息发送测试")
@Slf4j
@RequiredArgsConstructor
public class MessageTestController {
private final NormalMessageProducerService normalMessageProducerService;
@PostMapping("/sendNormal")
@Operation(summary = "发送同步普通消息", description = "发送同步RocketMQ消息到demo_topic")
public String sendNormalMessage(
@Parameter(description = "主题", required = true, example = "demo_topic") @RequestParam String topic,
@Parameter(description = "标签", example = "order") @RequestParam(required = false) String tags,
@Parameter(description = "消息键", example = "order_1001") @RequestParam(required = false) String keys,
@Parameter(description = "消息体", required = true, example = "{\"orderId\":\"1001\",\"amount\":99}") @RequestParam String body) {
try {
SendResult sendResult = normalMessageProducerService.sendNormalMessage(topic, tags, keys, body);
return "发送成功,msgId:" + sendResult.getMsgId();
} catch (Exception e) {
log.error("发送普通消息失败", e);
return "发送失败:" + e.getMessage();
}
}
@PostMapping("/sendAsync")
@Operation(summary = "发送异步普通消息", description = "发送异步RocketMQ消息到demo_topic")
public String sendAsyncMessage(
@Parameter(description = "主题", required = true) @RequestParam String topic,
@Parameter(description = "标签") @RequestParam(required = false) String tags,
@Parameter(description = "消息键") @RequestParam(required = false) String keys,
@Parameter(description = "消息体", required = true) @RequestParam String body) {
try {
normalMessageProducerService.sendAsyncMessage(topic, tags, keys, body);
return "异步发送请求已提交";
} catch (Exception e) {
log.error("发送异步消息失败", e);
return "发送失败:" + e.getMessage();
}
}
}
(5)配置文件(application.yml)
server:
port: 8080
rocketmq:
name-server: 192.168.1.100:9876
producer:
group: demo_producer_group
consumer:
group: demo_consumer_group
spring:
datasource:
url: jdbc:mysql://localhost:3306/rocketmq_demo?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
mybatis-plus:
mapper-locations: classpath:mapper/**/*.xml
type-aliases-package: com.jam.demo.entity
configuration:
map-underscore-to-camel-case: true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
springdoc:
swagger-ui:
path: /swagger-ui.html
operationsSorter: method
api-docs:
path: /v3/api-docs
packages-to-scan: com.jam.demo.controller
2. 顺序消息生产与消费
顺序消息要求同一业务流程的消息按顺序生产和消费(如订单创建→支付→发货),需保证消息发送到同一个MessageQueue,且消费时单线程处理该Queue。
(1)顺序消息生产者
package com.jam.demo.service;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import java.util.List;
/**
* 顺序消息生产者服务
* @author ken
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderMessageProducerService {
private final DefaultMQProducer defaultMQProducer;
/**
* 发送顺序消息(按业务ID选择MessageQueue)
* @param topic 主题
* @param tags 标签
* @param keys 消息键
* @param body 消息体
* @param businessId 业务ID(如订单ID,用于选择Queue)
* @return SendResult 发送结果
* @throws Exception 发送异常
*/
public SendResult sendOrderMessage(String topic, String tags, String keys, String body, String businessId) throws Exception {
if (!StringUtils.hasText(topic) || !StringUtils.hasText(body) || !StringUtils.hasText(businessId)) {
throw new IllegalArgumentException("topic、body、businessId不能为空");
}
Message message = new Message(topic, tags, keys, body.getBytes("UTF-8"));
// 按businessId哈希选择MessageQueue(保证同一业务ID的消息进入同一Queue)
SendResult sendResult = defaultMQProducer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String id = (String) arg;
int hash = id.hashCode() % mqs.size();
return mqs.get(Math.abs(hash));
}
}, businessId);
log.info("发送顺序消息成功,businessId:{},queueId:{},msgId:{}",
businessId, sendResult.getMessageQueue().getQueueId(), sendResult.getMsgId());
return sendResult;
}
}
(2)顺序消息消费者
package com.jam.demo.service;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.PostConstruct;
import java.util.List;
/**
* 顺序消息消费者服务
* @author ken
*/
@Service
@Slf4j
public class OrderMessageConsumerService {
@Value("${rocketmq.consumer.group}")
private String consumerGroup;
@Value("${rocketmq.name-server}")
private String nameServerAddr;
/**
* 初始化顺序消费者
* @throws MQClientException 初始化异常
*/
@PostConstruct
public void initOrderConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(nameServerAddr);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("demo_topic", "order");
// 注册顺序消息监听器(单线程处理每个MessageQueue)
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
context.setAutoCommit(true); // 自动提交偏移量
try {
for (MessageExt msg : msgs) {
String body = new String(msg.getBody(), "UTF-8");
String businessId = msg.getKeys().split("_")[1]; // 从keys解析业务ID
log.info("消费顺序消息成功,businessId:{},body:{},queueId:{}",
businessId, body, msg.getQueueId());
}
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) {
log.error("消费顺序消息失败", e);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; // 暂停当前Queue消费
}
});
consumer.start();
log.info("顺序消息消费者启动成功");
}
}
3. 批量消息生产
批量消息可减少网络请求次数,提升发送效率,但需注意单批消息大小不超过4MB。
package com.jam.demo.service;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
/**
* 批量消息生产者服务
* @author ken
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class BatchMessageProducerService {
private final DefaultMQProducer defaultMQProducer;
/**
* 发送批量消息
* @param topic 主题
* @param tags 标签
* @param messageList 消息列表
* @return SendResult 发送结果
* @throws Exception 发送异常
*/
public SendResult sendBatchMessage(String topic, String tags, List<String> messageList) throws Exception {
if (CollectionUtils.isEmpty(messageList)) {
throw new IllegalArgumentException("消息列表不能为空");
}
List<Message> msgs = new ArrayList<>();
for (String body : messageList) {
Message msg = new Message(topic, tags, "batch_" + System.currentTimeMillis(), body.getBytes("UTF-8"));
msgs.add(msg);
}
// 发送批量消息
SendResult sendResult = defaultMQProducer.send(msgs);
log.info("发送批量消息成功,数量:{},msgId:{}", messageList.size(), sendResult.getMsgId());
return sendResult;
}
}
四、企业级架构设计与落地
1. 高可用架构设计
RocketMQ的高可用依赖NameServer集群和Broker主从集群:
(1)NameServer集群搭建
NameServer无状态,只需启动多个节点即可,Producer/Consumer配置多个NameServer地址(用分号分隔):
rocketmq:
name-server: 192.168.1.100:9876;192.168.1.101:9876
(2)Broker主从集群搭建
- 主节点配置(broker-a.properties):
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0 # 0表示主节点
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER # 同步主节点(实时同步到从节点)
flushDiskType=SYNC_FLUSH # 同步刷盘(消息写入即刷盘)
storePathRootDir=/data/rocketmq/store/master
storePathCommitLog=/data/rocketmq/store/master/commitlog
namesrvAddr=192.168.1.100:9876;192.168.1.101:9876
- 从节点配置(broker-a-s.properties):
brokerClusterName=DefaultCluster
brokerName=broker-a # 与主节点同名
brokerId=1 # 非0表示从节点
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=SYNC_FLUSH
storePathRootDir=/data/rocketmq/store/slave
storePathCommitLog=/data/rocketmq/store/slave/commitlog
namesrvAddr=192.168.1.100:9876;192.168.1.101:9876
- 启动主从节点:
nohup sh mqbroker -c conf/broker-a.properties > broker-a.log 2>&1 &
nohup sh mqbroker -c conf/broker-a-s.properties > broker-a-s.log 2>&1 &
2. 消息可靠性保障
消息可靠性是企业级场景的核心需求,需从生产、存储、消费三个环节保障:
(1)生产环节:重试机制+异步刷盘确认
- 生产者设置重试次数:
producer.setRetryTimesWhenSendFailed(3); // 同步发送重试
producer.setRetryTimesWhenSendAsyncFailed(3); // 异步发送重试
- 选择SYNC_FLUSH刷盘模式,确保消息写入Broker磁盘后才返回成功。
(2)存储环节:主从同步+持久化
- Broker设置为SYNC_MASTER,主节点消息实时同步到从节点;
- 开启CommitLog持久化,消息写入后落盘到磁盘。
(3)消费环节:重试机制+死信队列
- 消费者设置最大重试次数,失败后进入死信队列:
consumer.setMaxReconsumeTimes(5);
- 监听死信队列处理失败消息:
consumer.subscribe("%DLQ%demo_consumer_group", "*"); // 死信队列命名规则:%DLQ%+消费者组名
3. 幂等性处理
重复消费是消息中间件的常见问题(如网络抖动导致重试),需通过幂等性设计避免业务异常。
(1)基于业务唯一键的幂等实现
① 数据库表设计
CREATE TABLE `message_consume_record` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`business_key` varchar(64) NOT NULL COMMENT '业务唯一键(如订单ID)',
`consume_status` varchar(16) NOT NULL COMMENT '消费状态:UNCONSUMED/CONSUMED',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_business_key` (`business_key`) COMMENT '唯一索引保证幂等'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息消费记录表';
② 实体类与Mapper
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 消息消费记录实体
* @author ken
*/
@Data
@TableName("message_consume_record")
public class MessageConsumeRecord {
@TableId(type = IdType.AUTO)
private Long id;
private String businessKey;
private String consumeStatus;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.MessageConsumeRecord;
import org.apache.ibatis.annotations.Mapper;
/**
* 消息消费记录Mapper
* @author ken
*/
@Mapper
public interface MessageConsumeRecordMapper extends BaseMapper<MessageConsumeRecord> {
}
③ 幂等消费服务
package com.jam.demo.service;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.jam.demo.entity.MessageConsumeRecord;
import com.jam.demo.mapper.MessageConsumeRecordMapper;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
/**
* 幂等消费服务
* @author ken
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class IdempotentConsumeService {
private final MessageConsumeRecordMapper consumeRecordMapper;
/**
* 处理幂等消费
* @param businessKey 业务唯一键
* @param consumeLogic 消费逻辑
* @return 消费状态
*/
public ConsumeConcurrentlyStatus handleIdempotent(String businessKey, Runnable consumeLogic) {
if (!StringUtils.hasText(businessKey)) {
log.error("业务唯一键不能为空");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 检查是否已消费
LambdaQueryWrapper<MessageConsumeRecord> queryWrapper = new LambdaQueryWrapper<MessageConsumeRecord>()
.eq(MessageConsumeRecord::getBusinessKey, businessKey);
MessageConsumeRecord record = consumeRecordMapper.selectOne(queryWrapper);
if (record == null) {
try {
// 执行消费逻辑
consumeLogic.run();
// 插入消费记录
MessageConsumeRecord newRecord = new MessageConsumeRecord();
newRecord.setBusinessKey(businessKey);
newRecord.setConsumeStatus("CONSUMED");
consumeRecordMapper.insert(newRecord);
log.info("幂等消费成功,businessKey:{}", businessKey);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("消费逻辑执行失败", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
} else {
log.info("消息已消费,businessKey:{}", businessKey);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
}
④ 消费者集成幂等服务
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
String businessKey = msg.getKeys(); // 业务唯一键放在keys中
return idempotentConsumeService.handleIdempotent(businessKey, () -> {
// 具体消费逻辑(如订单处理)
String body = new String(msg.getBody(), "UTF-8");
log.info("执行订单处理逻辑:{}", body);
});
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
4. 事务消息实现
RocketMQ通过半消息机制实现分布式事务,解决跨服务的数据一致性问题(如订单创建与库存扣减)。
(1)事务消息生产者
package com.jam.demo.service;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.PostConstruct;
/**
* 事务消息生产者服务
* @author ken
*/
@Service
@Slf4j
public class TransactionMessageProducerService {
@Value("${rocketmq.producer.group}")
private String producerGroup;
@Value("${rocketmq.name-server}")
private String nameServerAddr;
private TransactionMQProducer transactionProducer;
/**
* 初始化事务生产者
*/
@PostConstruct
public void initTransactionProducer() {
transactionProducer = new TransactionMQProducer(producerGroup);
transactionProducer.setNamesrvAddr(nameServerAddr);
// 设置事务监听器
transactionProducer.setTransactionListener(new TransactionListener() {
/**
* 执行本地事务(如扣减库存)
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String businessKey = msg.getKeys();
log.info("执行本地事务,businessKey:{}", businessKey);
try {
// 模拟本地事务(如数据库操作)
boolean localTxSuccess = true; // 实际场景需替换为真实业务逻辑
if (localTxSuccess) {
return LocalTransactionState.COMMIT_MESSAGE; // 提交消息
} else {
return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚消息
}
} catch (Exception e) {
log.error("本地事务执行异常", e);
return LocalTransactionState.UNKNOW; // 未知状态,等待回查
}
}
/**
* 事务回查(Broker主动查询本地事务状态)
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String businessKey = msg.getKeys();
log.info("事务回查,businessKey:{}", businessKey);
// 模拟查询本地事务状态
boolean txSuccess = true;
return txSuccess ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
}
});
try {
transactionProducer.start();
log.info("事务消息生产者启动成功");
} catch (Exception e) {
log.error("事务生产者启动失败", e);
throw new RuntimeException("事务生产者初始化失败", e);
}
}
/**
* 发送事务消息
* @param topic 主题
* @param tags 标签
* @param keys 业务键
* @param body 消息体
* @param arg 附加参数
*/
public void sendTransactionMessage(String topic, String tags, String keys, String body, Object arg) {
Message message = new Message(topic, tags, keys, body.getBytes());
try {
transactionProducer.sendMessageInTransaction(message, arg);
log.info("事务消息发送请求提交成功,keys:{}", keys);
} catch (Exception e) {
log.error("发送事务消息失败", e);
throw new RuntimeException("事务消息发送失败", e);
}
}
}
(2)事务消息测试接口
@PostMapping("/sendTransaction")
@Operation(summary = "发送事务消息", description = "发送RocketMQ事务消息")
public String sendTransactionMessage(
@Parameter(description = "主题", required = true) @RequestParam String topic,
@Parameter(description = "标签", required = true) @RequestParam String tags,
@Parameter(description = "业务键", required = true) @RequestParam String keys,
@Parameter(description = "消息体", required = true) @RequestParam String body) {
try {
transactionMessageProducerService.sendTransactionMessage(topic, tags, keys, body, null);
return "事务消息请求提交成功,业务键:" + keys;
} catch (Exception e) {
log.error("发送事务消息失败", e);
return "发送失败:" + e.getMessage();
}
}
五、问题排查与性能优化
1. 常见问题排查
(1)消息丢失
- 排查方向:生产者是否重试、Broker是否同步刷盘/主从同步、消费者是否确认消费。
- 工具:使用
mqadmin命令查看消息状态:
# 查看Topic消息累计数
sh mqadmin topicStatus -n 192.168.1.100:9876 -t demo_topic
# 查看Broker消息存储状态
sh mqadmin brokerStatus -n 192.168.1.100:9876 -b 192.168.1.100:10911
(2)消息堆积
- 排查方向:消费者消费速度是否低于生产者发送速度、消费者是否异常。
- 解决方法:增加消费线程数、优化消费逻辑、拆分Topic分区。
(3)重复消费
- 排查方向:消费者是否返回RECONSUME_LATER、网络是否抖动。
- 解决方法:实现幂等消费、调整重试次数。
2. 性能优化
(1)Broker优化
- 调整JVM内存(建议8G以上):
sed -i 's/-Xms2g -Xmx2g/-Xms8g -Xmx8g -Xmn4g/g' bin/runbroker.sh
- 开启文件内存映射(mmap):
mapedFileSizeCommitLog=1073741824 # CommitLog文件大小设为1GB
- 调整刷盘线程数:
flushCommitLogThreadPoolNums=4
flushConsumeQueueThreadPoolNums=2
(2)生产者优化
- 使用异步发送或批量发送;
- 合理设置消息压缩(
producer.setCompressMsgBodyOverHowmuch(1024*1024))。
(3)消费者优化
- 增加消费线程数(
consumer.setConsumeThreadMax(128)); - 批量消费(
consumer.setConsumeMessageBatchMaxSize(32)); - 避免消费逻辑中耗时操作(如远程调用)。
六、总结
RocketMQ作为一款高性能、高可用的消息中间件,已成为企业分布式架构的核心组件。本文从底层逻辑出发,讲解了核心API开发、企业级架构设计、可靠性保障、幂等性处理及性能优化等实战内容,所有示例代码均可直接落地生产。
在实际项目中,需结合业务场景选择合适的消息类型(普通/顺序/事务),通过集群部署保障高可用,通过幂等设计保障数据一致性。同时,需关注消息链路的监控与排查,确保系统稳定运行。
掌握RocketMQ不仅能提升分布式系统的设计能力,更能解决实际业务中的异步通信、削峰填谷等核心问题,助力企业架构升级。