庖丁解牛:RocketMQ Broker/Consumer/Producer源码深度剖析与实战

简介: 本文深入剖析了RocketMQ的核心机制,从源码层面解析了Producer、Broker和Consumer三大组件。Producer部分详细分析了消息发送流程、队列选择策略和重试机制;Broker部分重点讲解了消息存储架构(CommitLog、ConsumeQueue)、请求处理和刷盘策略;Consumer部分则解析了推/拉模式、偏移量管理和重试机制。通过实战案例展示了分布式事务消息和消息过滤功能,并提供性能优化建议。

RocketMQ作为阿里开源的分布式消息中间件,凭借高吞吐、低延迟、高可用的特性成为微服务架构的核心组件。本文将从源码层面拆解Broker、Consumer、Producer的核心机制,结合实战案例揭示底层原理,让你不仅知其然更知其所以然。

一、RocketMQ核心架构总览

先通过架构图建立整体认知,后续源码分析将围绕这些核心组件展开:

image.png

核心组件职责

  • NameServer:轻量级注册中心,管理Broker路由信息,支持动态扩缩容
  • Broker:核心消息存储节点,处理消息存储、投递、过滤等核心逻辑
  • Producer:消息生产者,支持同步/异步/单向发送模式
  • Consumer:消息消费者,支持推/拉模式,集群/广播消费

二、Producer源码深度剖析

2.1 Producer启动流程

image.png

核心代码解析:

package com.jam.demo.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.MixAll;
import org.springframework.util.StringUtils;

/**
* Producer启动核心逻辑
* @author ken
*/

@Slf4j
public class ProducerStartupAnalysis {
   
   public DefaultMQProducer createProducer(String groupName) throws MQClientException {
       // 1. 参数校验
       if (!StringUtils.hasText(groupName)) {
           throw new MQClientException("Producer group name is null", null);
       }
       
       DefaultMQProducer producer = new DefaultMQProducer(groupName);
       
       // 2. 创建MQClientInstance(核心客户端实例)
       producer.start();
       
       // 3. 定时任务:从NameServer更新路由信息(默认30s一次)
       // DefaultMQProducerImpl#startScheduledTask
       
       return producer;
   }
}

2.2 消息发送核心流程

Producer发送消息的核心入口是DefaultMQProducerImpl#sendDefaultImpl,完整流程:

image.png

2.2.1 消息发送核心代码

package com.jam.demo.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.util.ObjectUtils;

import java.util.concurrent.TimeUnit;

/**
* 消息发送核心实现
* @author ken
*/

@Slf4j
public class MessageSendAnalysis {
   
   private final DefaultMQProducer producer;
   
   public MessageSendAnalysis(DefaultMQProducer producer) {
       this.producer = producer;
   }
   
   /**
    * 同步发送消息核心逻辑
    * @param message 消息体
    * @return 发送结果
    * @throws MQClientException 客户端异常
    * @throws RemotingException 远程通信异常
    * @throws MQBrokerException Broker异常
    * @throws InterruptedException 中断异常
    */

   public SendResult sendMessage(Message message)
           throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
       // 1. 消息校验
       validateMessage(message);
       
       // 2. 发送消息(核心方法)
       SendResult sendResult = producer.send(message);
       
       // 3. 处理发送结果
       handleSendResult(sendResult, message);
       
       return sendResult;
   }
   
   /**
    * 消息参数校验
    */

   private void validateMessage(Message message) throws MQClientException {
       if (ObjectUtils.isEmpty(message)) {
           throw new MQClientException("Message is null", null);
       }
       if (!org.springframework.util.StringUtils.hasText(message.getTopic())) {
           throw new MQClientException("Message topic is null", null);
       }
       if (ObjectUtils.isEmpty(message.getBody())) {
           throw new MQClientException("Message body is null", null);
       }
   }
   
   /**
    * 处理发送结果
    */

   private void handleSendResult(SendResult sendResult, Message message) {
       if (sendResult != null) {
           switch (sendResult.getSendStatus()) {
               case SEND_OK:
                   log.info("消息发送成功,topic={}, msgId={}", message.getTopic(), sendResult.getMsgId());
                   break;
               default:
                   log.warn("消息发送异常,status={}, msgId={}", sendResult.getSendStatus(), sendResult.getMsgId());
           }
       }
   }
   
   // 测试代码
   public static void main(String[] args) throws Exception {
       DefaultMQProducer producer = new DefaultMQProducer("producer_demo_group");
       producer.setNamesrvAddr("localhost:9876");
       producer.start();
       
       MessageSendAnalysis analysis = new MessageSendAnalysis(producer);
       Message message = new Message("demo_topic", "demo_tag", "Hello RocketMQ".getBytes());
       
       SendResult result = analysis.sendMessage(message);
       log.info("发送结果:{}", result);
       
       TimeUnit.SECONDS.sleep(1);
       producer.shutdown();
   }
}

2.2.2 消息队列选择策略

RocketMQ提供多种队列选择策略,默认实现是AllocateMessageQueueAveragely

package com.jam.demo.producer;

import org.apache.rocketmq.client.latency.LatencyFaultTolerance;
import org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

/**
* 队列选择策略实现
* @author ken
*/

public class QueueSelectorAnalysis {
   
   private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
   
   /**
    * 默认队列选择策略:轮询+故障延迟规避
    */

   public MessageQueue selectQueue(List<MessageQueue> mqs, Message msg, Object arg) {
       if (mqs == null || mqs.isEmpty()) {
           return null;
       }
       
       // 如果指定了队列参数,直接使用
       if (arg != null) {
           int index = Integer.parseInt(arg.toString());
           return mqs.get(index % mqs.size());
       }
       
       // 故障延迟规避 + 轮询选择
       for (int i = 0; i < mqs.size(); i++) {
           int index = ThreadLocalRandom.current().nextInt(mqs.size());
           MessageQueue mq = mqs.get(index);
           if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
               return mq;
           }
       }
       
       // 如果所有队列都不可用,返回第一个队列
       return mqs.get(0);
   }
}

2.3 消息发送重试机制

Producer内置完善的重试机制,核心参数:

  • retryTimesWhenSendFailed:同步发送重试次数(默认2次)
  • retryTimesWhenSendAsyncFailed:异步发送重试次数(默认2次)
  • retryAnotherBrokerWhenNotStoreOK:是否重试其他Broker(默认false)

核心实现:

package com.jam.demo.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

/**
* 消息发送重试机制
* @author ken
*/

@Slf4j
public class RetryMechanismAnalysis {
   
   private final DefaultMQProducer producer;
   private int retryTimes = 2; // 默认重试2次
   
   public RetryMechanismAnalysis(DefaultMQProducer producer) {
       this.producer = producer;
       this.retryTimes = producer.getRetryTimesWhenSendFailed();
   }
   
   /**
    * 带重试的消息发送
    */

   public void sendWithRetry(Message message, List<MessageQueue> mqs) {
       for (int i = 0; i <= retryTimes; i++) {
           MessageQueue mq = selectQueue(mqs);
           try {
               // 发送消息
               producer.send(message, mq);
               log.info("消息发送成功,重试次数={}", i);
               return;
           } catch (Exception e) {
               log.error("消息发送失败,重试次数={}", i, e);
               
               // 如果是最后一次重试,抛出异常
               if (i == retryTimes) {
                   throw new RuntimeException("消息发送最终失败", e);
               }
               
               // 延迟一段时间后重试
               try {
                   Thread.sleep(100 * (i + 1));
               } catch (InterruptedException ie) {
                   Thread.currentThread().interrupt();
               }
           }
       }
   }
   
   private MessageQueue selectQueue(List<MessageQueue> mqs) {
       return mqs.get((int) (System.currentTimeMillis() % mqs.size()));
   }
}

三、Broker源码深度剖析

3.1 Broker核心存储机制

Broker采用混合存储架构,核心存储文件:

  • CommitLog:消息主体存储文件,所有Topic的消息顺序写入
  • ConsumeQueue:消费队列,存储消息在CommitLog的偏移量索引
  • IndexFile:消息索引文件,支持按Key查询消息

image.png

3.1.1 CommitLog写入核心代码

package com.jam.demo.broker;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.config.MessageStoreConfig;

/**
* CommitLog写入机制
* @author ken
*/

@Slf4j
public class CommitLogAnalysis {
   
   private final CommitLog commitLog;
   
   public CommitLogAnalysis(DefaultMessageStore messageStore) {
       this.commitLog = messageStore.getCommitLog();
   }
   
   /**
    * 消息存储核心流程
    */

   public boolean putMessage(MessageExtBrokerInner msg) {
       // 1. 消息校验
       if (msg == null || msg.getBody() == null) {
           log.error("消息为空,无法存储");
           return false;
       }
       
       // 2. 追加到CommitLog
       // CommitLog#putMessage核心逻辑:
       // - 获取当前MappedFile
       // - 计算消息长度
       // - 检查是否有足够空间
       // - 写入消息
       // - 更新偏移量
       
       // 3. 刷盘处理
       handleFlush(msg);
       
       // 4. 主从复制
       handleReplica(msg);
       
       return true;
   }
   
   /**
    * 刷盘策略处理
    */

   private void handleFlush(MessageExtBrokerInner msg) {
       MessageStoreConfig config = commitLog.getDefaultMessageStore().getMessageStoreConfig();
       
       if (config.isFlushDiskTypeSync()) {
           // 同步刷盘:等待刷盘完成
           commitLog.flush(0);
       } else {
           // 异步刷盘:由后台线程处理
           // FlushRealTimeService线程默认每隔500ms刷盘一次
       }
   }
   
   /**
    * 主从复制处理
    */

   private void handleReplica(MessageExtBrokerInner msg) {
       // 根据同步/异步复制策略处理
       // HAService负责主从复制
   }
}

3.1.2 ConsumeQueue构建机制

ConsumeQueue是消息的逻辑队列,每个Topic下的每个Queue对应一个ConsumeQueue文件:

package com.jam.demo.broker;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;

/**
* ConsumeQueue构建与查询
* @author ken
*/

@Slf4j
public class ConsumeQueueAnalysis {
   
   private final DefaultMessageStore messageStore;
   
   public ConsumeQueueAnalysis(DefaultMessageStore messageStore) {
       this.messageStore = messageStore;
   }
   
   /**
    * 获取消息队列的索引信息
    */

   public SelectMappedBufferResult getIndexInfo(String topic, int queueId, long offset) {
       // 1. 获取ConsumeQueue实例
       ConsumeQueue consumeQueue = messageStore.findConsumeQueue(topic, queueId);
       if (consumeQueue == null) {
           log.error("ConsumeQueue不存在,topic={}, queueId={}", topic, queueId);
           return null;
       }
       
       // 2. 查询指定偏移量的索引信息
       // 每个ConsumeQueue条目固定20字节:8字节CommitLog偏移量 + 4字节消息长度 + 8字节Tag哈希值
       return consumeQueue.getIndexBuffer(offset);
   }
   
   /**
    * 构建ConsumeQueue索引(由ReputMessageService线程处理)
    */

   public void buildIndex(String topic, int queueId, long commitLogOffset, int msgSize, long tagsCode) {
       ConsumeQueue consumeQueue = messageStore.findConsumeQueue(topic, queueId);
       if (consumeQueue != null) {
           // 写入索引条目
           consumeQueue.putPositionInfo(commitLogOffset, msgSize, tagsCode);
       }
   }
}

3.2 Broker请求处理机制

Broker通过Netty处理客户端请求,核心请求处理器:

  • SendMessageProcessor:处理消息发送请求
  • PullMessageProcessor:处理消息拉取请求
  • QueryMessageProcessor:处理消息查询请求

package com.jam.demo.broker;

import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.DefaultMessageStore;

/**
* Broker请求处理器核心逻辑
* @author ken
*/

@Slf4j
public class BrokerRequestProcessor {
   
   private final DefaultMessageStore messageStore;
   
   public BrokerRequestProcessor(DefaultMessageStore messageStore) {
       this.messageStore = messageStore;
   }
   
   /**
    * 处理客户端请求
    */

   public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
       switch (request.getCode()) {
           case RequestCode.SEND_MESSAGE:
               return handleSendMessage(ctx, request);
           case RequestCode.PULL_MESSAGE:
               return handlePullMessage(ctx, request);
           default:
               return RemotingCommand.createResponseCommand(null);
       }
   }
   
   /**
    * 处理消息发送请求
    */

   private RemotingCommand handleSendMessage(ChannelHandlerContext ctx, RemotingCommand request) {
       SendMessageRequestHeader header =
           (SendMessageRequestHeader) request.decodeCommandCustomHeader(SendMessageRequestHeader.class);
       
       log.info("接收消息发送请求,topic={}, queueId={}", header.getTopic(), header.getQueueId());
       
       // 1. 构建内部消息对象
       // 2. 存储消息到CommitLog
       // 3. 返回响应结果
       
       RemotingCommand response = RemotingCommand.createResponseCommand(null);
       response.setCode(0);
       response.setRemark(null);
       return response;
   }
   
   /**
    * 处理消息拉取请求
    */

   private RemotingCommand handlePullMessage(ChannelHandlerContext ctx, RemotingCommand request) {
       // 处理拉取请求逻辑
       return RemotingCommand.createResponseCommand(null);
   }
}

3.3 Broker刷盘与主从复制

3.3.1 刷盘策略实现

package com.jam.demo.broker;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;

/**
* 刷盘策略实现
* @author ken
*/

@Slf4j
public class FlushStrategyAnalysis {
   
   private final CommitLog commitLog;
   private final MessageStoreConfig storeConfig;
   
   public FlushStrategyAnalysis(CommitLog commitLog) {
       this.commitLog = commitLog;
       this.storeConfig = commitLog.getDefaultMessageStore().getMessageStoreConfig();
   }
   
   /**
    * 同步刷盘实现
    */

   public void syncFlush(long offset) {
       // 等待刷盘完成
       commitLog.flush(offset);
       log.info("同步刷盘完成,偏移量={}", offset);
   }
   
   /**
    * 异步刷盘实现(后台线程)
    */

   public class AsyncFlushService extends Thread {
       
       private volatile boolean isRunning = true;
       
       @Override
       public void run() {
           log.info("异步刷盘服务启动");
           
           while (isRunning) {
               try {
                   // 默认500ms刷盘一次
                   Thread.sleep(storeConfig.getFlushIntervalCommitLog());
                   
                   // 刷盘
                   commitLog.flush(0);
               } catch (InterruptedException e) {
                   log.warn("异步刷盘线程中断", e);
               }
           }
           
           log.info("异步刷盘服务停止");
       }
       
       public void shutdown() {
           isRunning = false;
           this.interrupt();
       }
   }
}

四、Consumer源码深度剖析

4.1 Consumer启动流程

image.png

4.2 消息消费模式

RocketMQ支持两种消费模式:

4.2.1 推模式(默认)

推模式本质是长轮询,核心实现DefaultMQPushConsumerImpl#pullMessage

package com.jam.demo.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.CollectionUtils;

import java.util.List;

/**
* 推模式消费实现
* @author ken
*/

@Slf4j
public class PushConsumerAnalysis {
   
   public void startPushConsumer() throws Exception {
       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo_group");
       consumer.setNamesrvAddr("localhost:9876");
       
       // 订阅Topic
       consumer.subscribe("demo_topic", "*");
       
       // 注册消息监听器
       consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
           if (CollectionUtils.isEmpty(msgs)) {
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           }
           
           // 处理消息
           for (MessageExt msg : msgs) {
               log.info("消费消息:topic={}, msgId={}, body={}",
                   msg.getTopic(), msg.getMsgId(), new String(msg.getBody()));
           }
           
           return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       });
       
       // 启动消费者
       consumer.start();
       log.info("推模式消费者启动成功");
   }
   
   public static void main(String[] args) throws Exception {
       PushConsumerAnalysis analysis = new PushConsumerAnalysis();
       analysis.startPushConsumer();
       
       // 保持进程运行
       Thread.currentThread().join();
   }
}

4.2.2 拉模式

拉模式由消费者主动控制拉取频率:

package com.jam.demo.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.CollectionUtils;

import java.util.List;

/**
* 拉模式消费实现
* @author ken
*/

@Slf4j
public class PullConsumerAnalysis {
   
   public void startPullConsumer() throws Exception {
       DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("pull_consumer_group");
       consumer.setNamesrvAddr("localhost:9876");
       
       // 订阅Topic
       consumer.subscribe("demo_topic", "*");
       
       // 启动消费者
       consumer.start();
       log.info("拉模式消费者启动成功");
       
       // 循环拉取消息
       while (true) {
           List<MessageExt> msgs = consumer.poll();
           
           if (!CollectionUtils.isEmpty(msgs)) {
               for (MessageExt msg : msgs) {
                   log.info("拉取消息:topic={}, msgId={}, body={}",
                       msg.getTopic(), msg.getMsgId(), new String(msg.getBody()));
               }
               
               // 提交偏移量
               consumer.commitSync();
           }
           
           // 控制拉取频率
           Thread.sleep(1000);
       }
   }
   
   public static void main(String[] args) throws Exception {
       PullConsumerAnalysis analysis = new PullConsumerAnalysis();
       analysis.startPullConsumer();
   }
}

4.3 消费偏移量管理

RocketMQ通过OffsetStore管理消费偏移量:

package com.jam.demo.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.OffsetStore;
import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.Map;

/**
* 消费偏移量管理
* @author ken
*/

@Slf4j
public class OffsetManagementAnalysis {
   
   private final DefaultMQPushConsumer consumer;
   private OffsetStore offsetStore;
   
   public OffsetManagementAnalysis(DefaultMQPushConsumer consumer) {
       this.consumer = consumer;
       initOffsetStore();
   }
   
   /**
    * 初始化偏移量存储
    */

   private void initOffsetStore() {
       // 集群模式:偏移量存储在Broker
       // 广播模式:偏移量存储在本地
       if (consumer.getMessageModel().isBroadcast()) {
           offsetStore = new LocalFileOffsetStore(consumer.getDefaultMQPushConsumerImpl().getMQClientFactory(),
                                                 consumer.getConsumerGroup());
       } else {
           offsetStore = new RemoteBrokerOffsetStore(consumer.getDefaultMQPushConsumerImpl().getMQClientFactory(),
                                                   consumer.getConsumerGroup());
       }
   }
   
   /**
    * 更新偏移量
    */

   public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
       offsetStore.updateOffset(mq, offset, increaseOnly);
       log.info("更新偏移量:topic={}, queueId={}, offset={}", mq.getTopic(), mq.getQueueId(), offset);
   }
   
   /**
    * 持久化偏移量
    */

   public void persistOffset(MessageQueue mq) {
       offsetStore.persist(mq);
       log.info("持久化偏移量:topic={}, queueId={}", mq.getTopic(), mq.getQueueId());
   }
   
   /**
    * 批量持久化偏移量
    */

   public void persistAllOffsets() {
       offsetStore.persistAll();
       log.info("批量持久化所有偏移量");
   }
   
   /**
    * 查询偏移量
    */

   public long queryOffset(MessageQueue mq) {
       return offsetStore.readOffset(mq, org.apache.rocketmq.client.consumer.store.ReadOffsetType.MEMORY_FIRST_THEN_STORE);
   }
}

4.4 消息重试机制

消费失败时,RocketMQ会将消息发送到重试队列(%RETRY%+ConsumerGroup):

package com.jam.demo.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.CollectionUtils;

import java.util.List;

/**
* 消费重试机制
* @author ken
*/

@Slf4j
public class ConsumeRetryAnalysis {
   
   public void startConsumerWithRetry() throws Exception {
       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_retry_group");
       consumer.setNamesrvAddr("localhost:9876");
       
       // 设置最大重试次数(默认16次)
       consumer.setMaxReconsumeTimes(3);
       
       // 订阅主Topic和重试Topic
       consumer.subscribe("demo_topic", "*");
       
       consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
           if (CollectionUtils.isEmpty(msgs)) {
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           }
           
           for (MessageExt msg : msgs) {
               try {
                   // 模拟消费失败
                   if (msg.getReconsumeTimes() < 2) {
                       log.info("模拟消费失败,重试次数={}", msg.getReconsumeTimes());
                       throw new RuntimeException("消费失败,需要重试");
                   }
                   
                   // 正常消费
                   log.info("消费成功:msgId={}, reconsumeTimes={}", msg.getMsgId(), msg.getReconsumeTimes());
               } catch (Exception e) {
                   log.error("消费异常", e);
                   
                   // 判断是否达到最大重试次数
                   if (msg.getReconsumeTimes() >= consumer.getMaxReconsumeTimes()) {
                       log.error("达到最大重试次数,消息进入死信队列:msgId={}", msg.getMsgId());
                       // 可以手动发送到死信队列
                       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                   }
                   
                   return ConsumeConcurrentlyStatus.RECONSUME_LATER;
               }
           }
           
           return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       });
       
       consumer.start();
       log.info("带重试机制的消费者启动成功");
   }
   
   public static void main(String[] args) throws Exception {
       ConsumeRetryAnalysis analysis = new ConsumeRetryAnalysis();
       analysis.startConsumerWithRetry();
       
       Thread.currentThread().join();
   }
}

五、实战案例:RocketMQ核心功能应用

5.1 分布式事务消息

RocketMQ通过二阶段提交实现分布式事务消息:

package com.jam.demo.transaction;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.StringUtils;

import java.util.concurrent.*;

/**
* 分布式事务消息实现
* @author ken
*/

@Slf4j
public class TransactionMessageDemo {
   
   public static void main(String[] args) throws Exception {
       // 创建事务生产者
       TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
       producer.setNamesrvAddr("localhost:9876");
       
       // 线程池用于执行本地事务
       ExecutorService executorService = new ThreadPoolExecutor(
           2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
           r -> {
               Thread thread = new Thread(r);
               thread.setName("transaction-thread");
               return thread;
           });
       
       producer.setExecutorService(executorService);
       
       // 注册事务监听器
       producer.setTransactionListener(new TransactionListener() {
           
           /**
            * 执行本地事务
            */

           @Override
           public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
               String transactionId = msg.getKeys();
               log.info("执行本地事务,transactionId={}", transactionId);
               
               try {
                   // 执行业务逻辑:扣减库存、创建订单等
                   boolean success = executeBusinessLogic(transactionId);
                   
                   if (success) {
                       log.info("本地事务执行成功,提交消息");
                       return LocalTransactionState.COMMIT_MESSAGE;
                   } else {
                       log.info("本地事务执行失败,回滚消息");
                       return LocalTransactionState.ROLLBACK_MESSAGE;
                   }
               } catch (Exception e) {
                   log.error("本地事务执行异常", e);
                   return LocalTransactionState.UNKNOW;
               }
           }
           
           /**
            * 事务回查
            */

           @Override
           public LocalTransactionState checkLocalTransaction(MessageExt msg) {
               String transactionId = msg.getKeys();
               log.info("事务回查,transactionId={}", transactionId);
               
               // 查询本地事务状态
               boolean isCommit = checkBusinessStatus(transactionId);
               
               if (isCommit) {
                   return LocalTransactionState.COMMIT_MESSAGE;
               } else {
                   return LocalTransactionState.ROLLBACK_MESSAGE;
               }
           }
       });
       
       producer.start();
       
       // 发送事务消息
       String transactionId = "tx_" + System.currentTimeMillis();
       Message message = new Message("transaction_topic", "tx_tag",
           transactionId, "订单创建请求".getBytes());
       
       producer.sendMessageInTransaction(message, null);
       
       log.info("事务消息发送成功,transactionId={}", transactionId);
       
       TimeUnit.SECONDS.sleep(10);
       producer.shutdown();
       executorService.shutdown();
   }
   
   /**
    * 执行业务逻辑
    */

   private static boolean executeBusinessLogic(String transactionId) {
       // 模拟业务逻辑执行
       return !transactionId.endsWith("0"); // 最后一位为0时模拟失败
   }
   
   /**
    * 检查业务状态
    */

   private static boolean checkBusinessStatus(String transactionId) {
       // 模拟查询数据库检查事务状态
       return StringUtils.hasText(transactionId);
   }
}

5.2 消息过滤机制

RocketMQ支持Tag过滤和SQL92过滤:

package com.jam.demo.filter;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;

/**
* 消息过滤实战
* @author ken
*/

@Slf4j
public class MessageFilterDemo {
   
   public static void main(String[] args) throws Exception {
       // 1. Tag过滤
       DefaultMQPushConsumer tagConsumer = new DefaultMQPushConsumer("tag_filter_consumer");
       tagConsumer.setNamesrvAddr("localhost:9876");
       // 只消费tag1或tag2的消息
       tagConsumer.subscribe("filter_topic", "tag1 || tag2");
       tagConsumer.registerMessageListener((msgs, context) -> {
           for (MessageExt msg : msgs) {
               log.info("Tag过滤消费:tag={}, body={}", msg.getTags(), new String(msg.getBody()));
           }
           return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       });
       tagConsumer.start();
       
       // 2. SQL92过滤
       DefaultMQPushConsumer sqlConsumer = new DefaultMQPushConsumer("sql_filter_consumer");
       sqlConsumer.setNamesrvAddr("localhost:9876");
       // 消费属性a>5且b='abc'的消息
       sqlConsumer.subscribe("filter_topic", MessageSelector.bySql("a > 5 AND b = 'abc'"));
       sqlConsumer.registerMessageListener((msgs, context) -> {
           for (MessageExt msg : msgs) {
               log.info("SQL过滤消费:a={}, b={}, body={}",
                   msg.getUserProperty("a"), msg.getUserProperty("b"), new String(msg.getBody()));
           }
           return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       });
       sqlConsumer.start();
       
       log.info("过滤消费者启动成功");
   }
}

六、性能优化与最佳实践

6.1 Producer性能优化

package com.jam.demo.optimize;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
* Producer性能优化配置
* @author ken
*/

public class ProducerOptimize {
   
   public DefaultMQProducer createOptimizedProducer() {
       DefaultMQProducer producer = new DefaultMQProducer("optimize_producer_group");
       
       // 批量发送(提高吞吐量)
       producer.setBatchSize(1000);
       producer.setCompressMsgBodyOverHowmuch(1024 * 4); // 4KB以上压缩
       
       // 异步发送(提高响应速度)
       producer.setAsyncSenderThreadPoolNums(8);
       
       // 超时配置
       producer.setSendMsgTimeout(3000);
       
       // 重试配置
       producer.setRetryTimesWhenSendFailed(2);
       producer.setRetryAnotherBrokerWhenNotStoreOK(true);
       
       // 消息最大长度(默认4MB)
       producer.setMaxMessageSize(1024 * 1024 * 4);
       
       return producer;
   }
   
   /**
    * 批量发送示例
    */

   public void batchSend(DefaultMQProducer producer) throws Exception {
       Message[] messages = new Message[10];
       for (int i = 0; i < 10; i++) {
           messages[i] = new Message("batch_topic", "batch_tag", ("Batch message " + i).getBytes());
       }
       
       SendResult result = producer.send(messages);
       System.out.println("批量发送结果:" + result);
   }
}

6.2 Broker性能优化

Broker优化核心配置:

# 存储路径配置

storePathRootDir=/data/rocketmq/store

storePathCommitLog=/data/rocketmq/store/commitlog

storePathConsumeQueue=/data/rocketmq/store/consumequeue


# 刷盘策略(生产环境建议异步刷盘)

flushDiskType=ASYNC_FLUSH


# 文件大小配置

mapedFileSizeCommitLog=1073741824  # 1GB

mapedFileSizeConsumeQueue=300000    # 300KB


# 清理过期文件

deleteWhen=04

fileReservedTime=72


# 线程池配置

sendMessageThreadPoolNums=16

pullMessageThreadPoolNums=16


# 网络配置

listenPort=10911

serverWorkerThreads=8

serverCallbackExecutorThreads=8

七、总结

本文从源码层面深度剖析了RocketMQ的Producer、Broker、Consumer三大核心组件,通过流程图、核心代码解析和实战案例,系统讲解了:

  1. Producer:消息发送流程、队列选择策略、重试机制
  2. Broker:消息存储机制、刷盘策略、主从复制
  3. Consumer:消费模式、偏移量管理、重试机制
  4. 实战应用:分布式事务、消息过滤、性能优化

RocketMQ的设计体现了高性能、高可用、可扩展的分布式系统设计思想,深入理解其源码有助于我们更好地使用和优化RocketMQ,解决实际生产环境中的问题。

附录:完整pom.xml配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

   <modelVersion>4.0.0</modelVersion>
   
   <groupId>com.jam.demo</groupId>
   <artifactId>rocketmq-source-analysis</artifactId>
   <version>1.0.0</version>
   
   <properties>
       <maven.compiler.source>17</maven.compiler.source>
       <maven.compiler.target>17</maven.compiler.target>
       <rocketmq.version>5.1.4</rocketmq.version>
       <lombok.version>1.18.30</lombok.version>
       <spring.version>6.1.2</spring.version>
   </properties>
   
   <dependencies>
       <!-- RocketMQ客户端 -->
       <dependency>
           <groupId>org.apache.rocketmq</groupId>
           <artifactId>rocketmq-client</artifactId>
           <version>${rocketmq.version}</version>
       </dependency>
       
       <!-- Lombok -->
       <dependency>
           <groupId>org.projectlombok</groupId>
           <artifactId>lombok</artifactId>
           <version>${lombok.version}</version>
           <scope>provided</scope>
       </dependency>
       
       <!-- Spring Context -->
       <dependency>
           <groupId>org.springframework</groupId>
           <artifactId>spring-context</artifactId>
           <version>${spring.version}</version>
       </dependency>
       
       <!-- Spring Utils -->
       <dependency>
           <groupId>org.springframework</groupId>
           <artifactId>spring-core</artifactId>
           <version>${spring.version}</version>
       </dependency>
       
       <!-- Guava -->
       <dependency>
           <groupId>com.google.guava</groupId>
           <artifactId>guava</artifactId>
           <version>32.1.3-jre</version>
       </dependency>
       
       <!-- Fastjson2 -->
       <dependency>
           <groupId>com.alibaba.fastjson2</groupId>
           <artifactId>fastjson2</artifactId>
           <version>2.0.32</version>
       </dependency>
   </dependencies>
</project>

目录
相关文章
|
5天前
|
云安全 人工智能 安全
AI被攻击怎么办?
阿里云提供 AI 全栈安全能力,其中对网络攻击的主动识别、智能阻断与快速响应构成其核心防线,依托原生安全防护为客户筑牢免疫屏障。
|
15天前
|
域名解析 人工智能
【实操攻略】手把手教学,免费领取.CN域名
即日起至2025年12月31日,购买万小智AI建站或云·企业官网,每单可免费领1个.CN域名首年!跟我了解领取攻略吧~
|
9天前
|
安全 Java Android开发
深度解析 Android 崩溃捕获原理及从崩溃到归因的闭环实践
崩溃堆栈全是 a.b.c?Native 错误查不到行号?本文详解 Android 崩溃采集全链路原理,教你如何把“天书”变“说明书”。RUM SDK 已支持一键接入。
614 216
|
存储 人工智能 监控
从代码生成到自主决策:打造一个Coding驱动的“自我编程”Agent
本文介绍了一种基于LLM的“自我编程”Agent系统,通过代码驱动实现复杂逻辑。该Agent以Python为执行引擎,结合Py4j实现Java与Python交互,支持多工具调用、记忆分层与上下文工程,具备感知、认知、表达、自我评估等能力模块,目标是打造可进化的“1.5线”智能助手。
855 61
|
7天前
|
人工智能 移动开发 自然语言处理
2025最新HTML静态网页制作工具推荐:10款免费在线生成器小白也能5分钟上手
晓猛团队精选2025年10款真正免费、无需编程的在线HTML建站工具,涵盖AI生成、拖拽编辑、设计稿转代码等多种类型,均支持浏览器直接使用、快速出图与文件导出,特别适合零基础用户快速搭建个人网站、落地页或企业官网。
1282 157
|
5天前
|
编解码 Linux 数据安全/隐私保护
教程分享免费视频压缩软件,免费视频压缩,视频压缩免费,附压缩方法及学习教程
教程分享免费视频压缩软件,免费视频压缩,视频压缩免费,附压缩方法及学习教程
241 138
|
7天前
|
存储 安全 固态存储
四款WIN PE工具,都可以实现U盘安装教程
Windows PE是基于NT内核的轻量系统,用于系统安装、分区管理及故障修复。本文推荐多款PE制作工具,支持U盘启动,兼容UEFI/Legacy模式,具备备份还原、驱动识别等功能,操作简便,适合新旧电脑维护使用。
529 109