RocketMQ作为阿里开源的分布式消息中间件,凭借高吞吐、低延迟、高可用的特性成为微服务架构的核心组件。本文将从源码层面拆解Broker、Consumer、Producer的核心机制,结合实战案例揭示底层原理,让你不仅知其然更知其所以然。
一、RocketMQ核心架构总览
先通过架构图建立整体认知,后续源码分析将围绕这些核心组件展开:
核心组件职责
- NameServer:轻量级注册中心,管理Broker路由信息,支持动态扩缩容
- Broker:核心消息存储节点,处理消息存储、投递、过滤等核心逻辑
- Producer:消息生产者,支持同步/异步/单向发送模式
- Consumer:消息消费者,支持推/拉模式,集群/广播消费
二、Producer源码深度剖析
2.1 Producer启动流程
核心代码解析:
package com.jam.demo.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.MixAll;
import org.springframework.util.StringUtils;
/**
* Producer启动核心逻辑
* @author ken
*/
@Slf4j
public class ProducerStartupAnalysis {
public DefaultMQProducer createProducer(String groupName) throws MQClientException {
// 1. 参数校验
if (!StringUtils.hasText(groupName)) {
throw new MQClientException("Producer group name is null", null);
}
DefaultMQProducer producer = new DefaultMQProducer(groupName);
// 2. 创建MQClientInstance(核心客户端实例)
producer.start();
// 3. 定时任务:从NameServer更新路由信息(默认30s一次)
// DefaultMQProducerImpl#startScheduledTask
return producer;
}
}
2.2 消息发送核心流程
Producer发送消息的核心入口是DefaultMQProducerImpl#sendDefaultImpl,完整流程:
2.2.1 消息发送核心代码
package com.jam.demo.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.util.ObjectUtils;
import java.util.concurrent.TimeUnit;
/**
* 消息发送核心实现
* @author ken
*/
@Slf4j
public class MessageSendAnalysis {
private final DefaultMQProducer producer;
public MessageSendAnalysis(DefaultMQProducer producer) {
this.producer = producer;
}
/**
* 同步发送消息核心逻辑
* @param message 消息体
* @return 发送结果
* @throws MQClientException 客户端异常
* @throws RemotingException 远程通信异常
* @throws MQBrokerException Broker异常
* @throws InterruptedException 中断异常
*/
public SendResult sendMessage(Message message)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 1. 消息校验
validateMessage(message);
// 2. 发送消息(核心方法)
SendResult sendResult = producer.send(message);
// 3. 处理发送结果
handleSendResult(sendResult, message);
return sendResult;
}
/**
* 消息参数校验
*/
private void validateMessage(Message message) throws MQClientException {
if (ObjectUtils.isEmpty(message)) {
throw new MQClientException("Message is null", null);
}
if (!org.springframework.util.StringUtils.hasText(message.getTopic())) {
throw new MQClientException("Message topic is null", null);
}
if (ObjectUtils.isEmpty(message.getBody())) {
throw new MQClientException("Message body is null", null);
}
}
/**
* 处理发送结果
*/
private void handleSendResult(SendResult sendResult, Message message) {
if (sendResult != null) {
switch (sendResult.getSendStatus()) {
case SEND_OK:
log.info("消息发送成功,topic={}, msgId={}", message.getTopic(), sendResult.getMsgId());
break;
default:
log.warn("消息发送异常,status={}, msgId={}", sendResult.getSendStatus(), sendResult.getMsgId());
}
}
}
// 测试代码
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_demo_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
MessageSendAnalysis analysis = new MessageSendAnalysis(producer);
Message message = new Message("demo_topic", "demo_tag", "Hello RocketMQ".getBytes());
SendResult result = analysis.sendMessage(message);
log.info("发送结果:{}", result);
TimeUnit.SECONDS.sleep(1);
producer.shutdown();
}
}
2.2.2 消息队列选择策略
RocketMQ提供多种队列选择策略,默认实现是AllocateMessageQueueAveragely:
package com.jam.demo.producer;
import org.apache.rocketmq.client.latency.LatencyFaultTolerance;
import org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* 队列选择策略实现
* @author ken
*/
public class QueueSelectorAnalysis {
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
/**
* 默认队列选择策略:轮询+故障延迟规避
*/
public MessageQueue selectQueue(List<MessageQueue> mqs, Message msg, Object arg) {
if (mqs == null || mqs.isEmpty()) {
return null;
}
// 如果指定了队列参数,直接使用
if (arg != null) {
int index = Integer.parseInt(arg.toString());
return mqs.get(index % mqs.size());
}
// 故障延迟规避 + 轮询选择
for (int i = 0; i < mqs.size(); i++) {
int index = ThreadLocalRandom.current().nextInt(mqs.size());
MessageQueue mq = mqs.get(index);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
return mq;
}
}
// 如果所有队列都不可用,返回第一个队列
return mqs.get(0);
}
}
2.3 消息发送重试机制
Producer内置完善的重试机制,核心参数:
retryTimesWhenSendFailed:同步发送重试次数(默认2次)retryTimesWhenSendAsyncFailed:异步发送重试次数(默认2次)retryAnotherBrokerWhenNotStoreOK:是否重试其他Broker(默认false)
核心实现:
package com.jam.demo.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
/**
* 消息发送重试机制
* @author ken
*/
@Slf4j
public class RetryMechanismAnalysis {
private final DefaultMQProducer producer;
private int retryTimes = 2; // 默认重试2次
public RetryMechanismAnalysis(DefaultMQProducer producer) {
this.producer = producer;
this.retryTimes = producer.getRetryTimesWhenSendFailed();
}
/**
* 带重试的消息发送
*/
public void sendWithRetry(Message message, List<MessageQueue> mqs) {
for (int i = 0; i <= retryTimes; i++) {
MessageQueue mq = selectQueue(mqs);
try {
// 发送消息
producer.send(message, mq);
log.info("消息发送成功,重试次数={}", i);
return;
} catch (Exception e) {
log.error("消息发送失败,重试次数={}", i, e);
// 如果是最后一次重试,抛出异常
if (i == retryTimes) {
throw new RuntimeException("消息发送最终失败", e);
}
// 延迟一段时间后重试
try {
Thread.sleep(100 * (i + 1));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
}
private MessageQueue selectQueue(List<MessageQueue> mqs) {
return mqs.get((int) (System.currentTimeMillis() % mqs.size()));
}
}
三、Broker源码深度剖析
3.1 Broker核心存储机制
Broker采用混合存储架构,核心存储文件:
- CommitLog:消息主体存储文件,所有Topic的消息顺序写入
- ConsumeQueue:消费队列,存储消息在CommitLog的偏移量索引
- IndexFile:消息索引文件,支持按Key查询消息
3.1.1 CommitLog写入核心代码
package com.jam.demo.broker;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.config.MessageStoreConfig;
/**
* CommitLog写入机制
* @author ken
*/
@Slf4j
public class CommitLogAnalysis {
private final CommitLog commitLog;
public CommitLogAnalysis(DefaultMessageStore messageStore) {
this.commitLog = messageStore.getCommitLog();
}
/**
* 消息存储核心流程
*/
public boolean putMessage(MessageExtBrokerInner msg) {
// 1. 消息校验
if (msg == null || msg.getBody() == null) {
log.error("消息为空,无法存储");
return false;
}
// 2. 追加到CommitLog
// CommitLog#putMessage核心逻辑:
// - 获取当前MappedFile
// - 计算消息长度
// - 检查是否有足够空间
// - 写入消息
// - 更新偏移量
// 3. 刷盘处理
handleFlush(msg);
// 4. 主从复制
handleReplica(msg);
return true;
}
/**
* 刷盘策略处理
*/
private void handleFlush(MessageExtBrokerInner msg) {
MessageStoreConfig config = commitLog.getDefaultMessageStore().getMessageStoreConfig();
if (config.isFlushDiskTypeSync()) {
// 同步刷盘:等待刷盘完成
commitLog.flush(0);
} else {
// 异步刷盘:由后台线程处理
// FlushRealTimeService线程默认每隔500ms刷盘一次
}
}
/**
* 主从复制处理
*/
private void handleReplica(MessageExtBrokerInner msg) {
// 根据同步/异步复制策略处理
// HAService负责主从复制
}
}
3.1.2 ConsumeQueue构建机制
ConsumeQueue是消息的逻辑队列,每个Topic下的每个Queue对应一个ConsumeQueue文件:
package com.jam.demo.broker;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
/**
* ConsumeQueue构建与查询
* @author ken
*/
@Slf4j
public class ConsumeQueueAnalysis {
private final DefaultMessageStore messageStore;
public ConsumeQueueAnalysis(DefaultMessageStore messageStore) {
this.messageStore = messageStore;
}
/**
* 获取消息队列的索引信息
*/
public SelectMappedBufferResult getIndexInfo(String topic, int queueId, long offset) {
// 1. 获取ConsumeQueue实例
ConsumeQueue consumeQueue = messageStore.findConsumeQueue(topic, queueId);
if (consumeQueue == null) {
log.error("ConsumeQueue不存在,topic={}, queueId={}", topic, queueId);
return null;
}
// 2. 查询指定偏移量的索引信息
// 每个ConsumeQueue条目固定20字节:8字节CommitLog偏移量 + 4字节消息长度 + 8字节Tag哈希值
return consumeQueue.getIndexBuffer(offset);
}
/**
* 构建ConsumeQueue索引(由ReputMessageService线程处理)
*/
public void buildIndex(String topic, int queueId, long commitLogOffset, int msgSize, long tagsCode) {
ConsumeQueue consumeQueue = messageStore.findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
// 写入索引条目
consumeQueue.putPositionInfo(commitLogOffset, msgSize, tagsCode);
}
}
}
3.2 Broker请求处理机制
Broker通过Netty处理客户端请求,核心请求处理器:
- SendMessageProcessor:处理消息发送请求
- PullMessageProcessor:处理消息拉取请求
- QueryMessageProcessor:处理消息查询请求
package com.jam.demo.broker;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.DefaultMessageStore;
/**
* Broker请求处理器核心逻辑
* @author ken
*/
@Slf4j
public class BrokerRequestProcessor {
private final DefaultMessageStore messageStore;
public BrokerRequestProcessor(DefaultMessageStore messageStore) {
this.messageStore = messageStore;
}
/**
* 处理客户端请求
*/
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
switch (request.getCode()) {
case RequestCode.SEND_MESSAGE:
return handleSendMessage(ctx, request);
case RequestCode.PULL_MESSAGE:
return handlePullMessage(ctx, request);
default:
return RemotingCommand.createResponseCommand(null);
}
}
/**
* 处理消息发送请求
*/
private RemotingCommand handleSendMessage(ChannelHandlerContext ctx, RemotingCommand request) {
SendMessageRequestHeader header =
(SendMessageRequestHeader) request.decodeCommandCustomHeader(SendMessageRequestHeader.class);
log.info("接收消息发送请求,topic={}, queueId={}", header.getTopic(), header.getQueueId());
// 1. 构建内部消息对象
// 2. 存储消息到CommitLog
// 3. 返回响应结果
RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(0);
response.setRemark(null);
return response;
}
/**
* 处理消息拉取请求
*/
private RemotingCommand handlePullMessage(ChannelHandlerContext ctx, RemotingCommand request) {
// 处理拉取请求逻辑
return RemotingCommand.createResponseCommand(null);
}
}
3.3 Broker刷盘与主从复制
3.3.1 刷盘策略实现
package com.jam.demo.broker;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
/**
* 刷盘策略实现
* @author ken
*/
@Slf4j
public class FlushStrategyAnalysis {
private final CommitLog commitLog;
private final MessageStoreConfig storeConfig;
public FlushStrategyAnalysis(CommitLog commitLog) {
this.commitLog = commitLog;
this.storeConfig = commitLog.getDefaultMessageStore().getMessageStoreConfig();
}
/**
* 同步刷盘实现
*/
public void syncFlush(long offset) {
// 等待刷盘完成
commitLog.flush(offset);
log.info("同步刷盘完成,偏移量={}", offset);
}
/**
* 异步刷盘实现(后台线程)
*/
public class AsyncFlushService extends Thread {
private volatile boolean isRunning = true;
@Override
public void run() {
log.info("异步刷盘服务启动");
while (isRunning) {
try {
// 默认500ms刷盘一次
Thread.sleep(storeConfig.getFlushIntervalCommitLog());
// 刷盘
commitLog.flush(0);
} catch (InterruptedException e) {
log.warn("异步刷盘线程中断", e);
}
}
log.info("异步刷盘服务停止");
}
public void shutdown() {
isRunning = false;
this.interrupt();
}
}
}
四、Consumer源码深度剖析
4.1 Consumer启动流程
4.2 消息消费模式
RocketMQ支持两种消费模式:
4.2.1 推模式(默认)
推模式本质是长轮询,核心实现DefaultMQPushConsumerImpl#pullMessage:
package com.jam.demo.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* 推模式消费实现
* @author ken
*/
@Slf4j
public class PushConsumerAnalysis {
public void startPushConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo_group");
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.subscribe("demo_topic", "*");
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
if (CollectionUtils.isEmpty(msgs)) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 处理消息
for (MessageExt msg : msgs) {
log.info("消费消息:topic={}, msgId={}, body={}",
msg.getTopic(), msg.getMsgId(), new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者
consumer.start();
log.info("推模式消费者启动成功");
}
public static void main(String[] args) throws Exception {
PushConsumerAnalysis analysis = new PushConsumerAnalysis();
analysis.startPushConsumer();
// 保持进程运行
Thread.currentThread().join();
}
}
4.2.2 拉模式
拉模式由消费者主动控制拉取频率:
package com.jam.demo.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* 拉模式消费实现
* @author ken
*/
@Slf4j
public class PullConsumerAnalysis {
public void startPullConsumer() throws Exception {
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("pull_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.subscribe("demo_topic", "*");
// 启动消费者
consumer.start();
log.info("拉模式消费者启动成功");
// 循环拉取消息
while (true) {
List<MessageExt> msgs = consumer.poll();
if (!CollectionUtils.isEmpty(msgs)) {
for (MessageExt msg : msgs) {
log.info("拉取消息:topic={}, msgId={}, body={}",
msg.getTopic(), msg.getMsgId(), new String(msg.getBody()));
}
// 提交偏移量
consumer.commitSync();
}
// 控制拉取频率
Thread.sleep(1000);
}
}
public static void main(String[] args) throws Exception {
PullConsumerAnalysis analysis = new PullConsumerAnalysis();
analysis.startPullConsumer();
}
}
4.3 消费偏移量管理
RocketMQ通过OffsetStore管理消费偏移量:
package com.jam.demo.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.OffsetStore;
import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.Map;
/**
* 消费偏移量管理
* @author ken
*/
@Slf4j
public class OffsetManagementAnalysis {
private final DefaultMQPushConsumer consumer;
private OffsetStore offsetStore;
public OffsetManagementAnalysis(DefaultMQPushConsumer consumer) {
this.consumer = consumer;
initOffsetStore();
}
/**
* 初始化偏移量存储
*/
private void initOffsetStore() {
// 集群模式:偏移量存储在Broker
// 广播模式:偏移量存储在本地
if (consumer.getMessageModel().isBroadcast()) {
offsetStore = new LocalFileOffsetStore(consumer.getDefaultMQPushConsumerImpl().getMQClientFactory(),
consumer.getConsumerGroup());
} else {
offsetStore = new RemoteBrokerOffsetStore(consumer.getDefaultMQPushConsumerImpl().getMQClientFactory(),
consumer.getConsumerGroup());
}
}
/**
* 更新偏移量
*/
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
offsetStore.updateOffset(mq, offset, increaseOnly);
log.info("更新偏移量:topic={}, queueId={}, offset={}", mq.getTopic(), mq.getQueueId(), offset);
}
/**
* 持久化偏移量
*/
public void persistOffset(MessageQueue mq) {
offsetStore.persist(mq);
log.info("持久化偏移量:topic={}, queueId={}", mq.getTopic(), mq.getQueueId());
}
/**
* 批量持久化偏移量
*/
public void persistAllOffsets() {
offsetStore.persistAll();
log.info("批量持久化所有偏移量");
}
/**
* 查询偏移量
*/
public long queryOffset(MessageQueue mq) {
return offsetStore.readOffset(mq, org.apache.rocketmq.client.consumer.store.ReadOffsetType.MEMORY_FIRST_THEN_STORE);
}
}
4.4 消息重试机制
消费失败时,RocketMQ会将消息发送到重试队列(%RETRY%+ConsumerGroup):
package com.jam.demo.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* 消费重试机制
* @author ken
*/
@Slf4j
public class ConsumeRetryAnalysis {
public void startConsumerWithRetry() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_retry_group");
consumer.setNamesrvAddr("localhost:9876");
// 设置最大重试次数(默认16次)
consumer.setMaxReconsumeTimes(3);
// 订阅主Topic和重试Topic
consumer.subscribe("demo_topic", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
if (CollectionUtils.isEmpty(msgs)) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt msg : msgs) {
try {
// 模拟消费失败
if (msg.getReconsumeTimes() < 2) {
log.info("模拟消费失败,重试次数={}", msg.getReconsumeTimes());
throw new RuntimeException("消费失败,需要重试");
}
// 正常消费
log.info("消费成功:msgId={}, reconsumeTimes={}", msg.getMsgId(), msg.getReconsumeTimes());
} catch (Exception e) {
log.error("消费异常", e);
// 判断是否达到最大重试次数
if (msg.getReconsumeTimes() >= consumer.getMaxReconsumeTimes()) {
log.error("达到最大重试次数,消息进入死信队列:msgId={}", msg.getMsgId());
// 可以手动发送到死信队列
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
log.info("带重试机制的消费者启动成功");
}
public static void main(String[] args) throws Exception {
ConsumeRetryAnalysis analysis = new ConsumeRetryAnalysis();
analysis.startConsumerWithRetry();
Thread.currentThread().join();
}
}
五、实战案例:RocketMQ核心功能应用
5.1 分布式事务消息
RocketMQ通过二阶段提交实现分布式事务消息:
package com.jam.demo.transaction;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.StringUtils;
import java.util.concurrent.*;
/**
* 分布式事务消息实现
* @author ken
*/
@Slf4j
public class TransactionMessageDemo {
public static void main(String[] args) throws Exception {
// 创建事务生产者
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
producer.setNamesrvAddr("localhost:9876");
// 线程池用于执行本地事务
ExecutorService executorService = new ThreadPoolExecutor(
2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
r -> {
Thread thread = new Thread(r);
thread.setName("transaction-thread");
return thread;
});
producer.setExecutorService(executorService);
// 注册事务监听器
producer.setTransactionListener(new TransactionListener() {
/**
* 执行本地事务
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transactionId = msg.getKeys();
log.info("执行本地事务,transactionId={}", transactionId);
try {
// 执行业务逻辑:扣减库存、创建订单等
boolean success = executeBusinessLogic(transactionId);
if (success) {
log.info("本地事务执行成功,提交消息");
return LocalTransactionState.COMMIT_MESSAGE;
} else {
log.info("本地事务执行失败,回滚消息");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
log.error("本地事务执行异常", e);
return LocalTransactionState.UNKNOW;
}
}
/**
* 事务回查
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String transactionId = msg.getKeys();
log.info("事务回查,transactionId={}", transactionId);
// 查询本地事务状态
boolean isCommit = checkBusinessStatus(transactionId);
if (isCommit) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
});
producer.start();
// 发送事务消息
String transactionId = "tx_" + System.currentTimeMillis();
Message message = new Message("transaction_topic", "tx_tag",
transactionId, "订单创建请求".getBytes());
producer.sendMessageInTransaction(message, null);
log.info("事务消息发送成功,transactionId={}", transactionId);
TimeUnit.SECONDS.sleep(10);
producer.shutdown();
executorService.shutdown();
}
/**
* 执行业务逻辑
*/
private static boolean executeBusinessLogic(String transactionId) {
// 模拟业务逻辑执行
return !transactionId.endsWith("0"); // 最后一位为0时模拟失败
}
/**
* 检查业务状态
*/
private static boolean checkBusinessStatus(String transactionId) {
// 模拟查询数据库检查事务状态
return StringUtils.hasText(transactionId);
}
}
5.2 消息过滤机制
RocketMQ支持Tag过滤和SQL92过滤:
package com.jam.demo.filter;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
/**
* 消息过滤实战
* @author ken
*/
@Slf4j
public class MessageFilterDemo {
public static void main(String[] args) throws Exception {
// 1. Tag过滤
DefaultMQPushConsumer tagConsumer = new DefaultMQPushConsumer("tag_filter_consumer");
tagConsumer.setNamesrvAddr("localhost:9876");
// 只消费tag1或tag2的消息
tagConsumer.subscribe("filter_topic", "tag1 || tag2");
tagConsumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
log.info("Tag过滤消费:tag={}, body={}", msg.getTags(), new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
tagConsumer.start();
// 2. SQL92过滤
DefaultMQPushConsumer sqlConsumer = new DefaultMQPushConsumer("sql_filter_consumer");
sqlConsumer.setNamesrvAddr("localhost:9876");
// 消费属性a>5且b='abc'的消息
sqlConsumer.subscribe("filter_topic", MessageSelector.bySql("a > 5 AND b = 'abc'"));
sqlConsumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
log.info("SQL过滤消费:a={}, b={}, body={}",
msg.getUserProperty("a"), msg.getUserProperty("b"), new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
sqlConsumer.start();
log.info("过滤消费者启动成功");
}
}
六、性能优化与最佳实践
6.1 Producer性能优化
package com.jam.demo.optimize;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
/**
* Producer性能优化配置
* @author ken
*/
public class ProducerOptimize {
public DefaultMQProducer createOptimizedProducer() {
DefaultMQProducer producer = new DefaultMQProducer("optimize_producer_group");
// 批量发送(提高吞吐量)
producer.setBatchSize(1000);
producer.setCompressMsgBodyOverHowmuch(1024 * 4); // 4KB以上压缩
// 异步发送(提高响应速度)
producer.setAsyncSenderThreadPoolNums(8);
// 超时配置
producer.setSendMsgTimeout(3000);
// 重试配置
producer.setRetryTimesWhenSendFailed(2);
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
// 消息最大长度(默认4MB)
producer.setMaxMessageSize(1024 * 1024 * 4);
return producer;
}
/**
* 批量发送示例
*/
public void batchSend(DefaultMQProducer producer) throws Exception {
Message[] messages = new Message[10];
for (int i = 0; i < 10; i++) {
messages[i] = new Message("batch_topic", "batch_tag", ("Batch message " + i).getBytes());
}
SendResult result = producer.send(messages);
System.out.println("批量发送结果:" + result);
}
}
6.2 Broker性能优化
Broker优化核心配置:
# 存储路径配置
storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
storePathConsumeQueue=/data/rocketmq/store/consumequeue
# 刷盘策略(生产环境建议异步刷盘)
flushDiskType=ASYNC_FLUSH
# 文件大小配置
mapedFileSizeCommitLog=1073741824 # 1GB
mapedFileSizeConsumeQueue=300000 # 300KB
# 清理过期文件
deleteWhen=04
fileReservedTime=72
# 线程池配置
sendMessageThreadPoolNums=16
pullMessageThreadPoolNums=16
# 网络配置
listenPort=10911
serverWorkerThreads=8
serverCallbackExecutorThreads=8
七、总结
本文从源码层面深度剖析了RocketMQ的Producer、Broker、Consumer三大核心组件,通过流程图、核心代码解析和实战案例,系统讲解了:
- Producer:消息发送流程、队列选择策略、重试机制
- Broker:消息存储机制、刷盘策略、主从复制
- Consumer:消费模式、偏移量管理、重试机制
- 实战应用:分布式事务、消息过滤、性能优化
RocketMQ的设计体现了高性能、高可用、可扩展的分布式系统设计思想,深入理解其源码有助于我们更好地使用和优化RocketMQ,解决实际生产环境中的问题。
附录:完整pom.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.jam.demo</groupId>
<artifactId>rocketmq-source-analysis</artifactId>
<version>1.0.0</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<rocketmq.version>5.1.4</rocketmq.version>
<lombok.version>1.18.30</lombok.version>
<spring.version>6.1.2</spring.version>
</properties>
<dependencies>
<!-- RocketMQ客户端 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- Spring Context -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- Spring Utils -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- Guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.3-jre</version>
</dependency>
<!-- Fastjson2 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.32</version>
</dependency>
</dependencies>
</project>