RocketMQ事务消息原理简析

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 在项目中,经常遇到这样一个场景,需要保证数据持久化和消息发送要么同时成功,要么同时失败。比如当用户在交易系统下了一个订单,购物车需要消费订单消息清除加购数据、积分系统需要变更用户积分、短信平台需要给买家发送提醒等。利用RocketMQ事务消息特性,可以轻松达到这个目的。本文将从RocketMQ事务消息使用方法说起,探究RocketMQ事务消息实现原理。

零、业务场景

在项目中,经常遇到这样一个场景,需要保证数据持久化和消息发送要么同时成功,要么同时失败。比如当用户在交易系统下了一个订单,购物车需要消费订单消息清除加购数据、积分系统需要变更用户积分、短信平台需要给买家发送提醒等,交易系统要将订单落入DB和发送订单消息保证一致,不能本地事务回滚,订单没有生成但是发送了创建订单消息,下游系统产生脏数据,也不能订单已经创建,但是下游系统没有感知继而无法履约,影响用户体验。

如果让我们自己实现的话,当然也是有办法的。比如在业务数据库中建立一张消息表用于存储消息,将业务数据和消息数据放在同一个事务中进行存储,就可以利用数据库事务保证同时原子性。后续可以定时扫描消息表,将消息数据再发送出去。

当然也可以用现成的解决方案,RocketMQ从4.3.0版本开始,支持事务消息。我们只需要编写对应的本地事务执行方法executeLocalTransaction和本地事务执行结果检查方法checkLocalTransaction,RocketMQ会自动调用本地事务执行。如果本地事务执行成功,下游才能消费到消息,如果本地事务执行失败,下游是无法感知到这条消息的

一、使用方法

使用RocketMQ发送事务消息,只有消息发送和普通消息发送有所区别。参见官方示例:


// TransactionProducer.java

// 需要自定义一个TransactionListener用于执行事务executeLocalTransaction和事务执行结果回查checkLocalTransaction 代码在下面

TransactionListener transactionListener = new TransactionListenerImpl();

// 事务消息发送producer

TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);

producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);

// 创建一个线程池 用于Broker回查本地事务执行状态 如果这里没有创建,RocketMQ会自动创建一个线程池

ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {

 Thread thread = new Thread(r);

 thread.setName("client-transaction-msg-check-thread");

 return thread;

});


producer.setExecutorService(executorService);

producer.setTransactionListener(transactionListener);

producer.start();


String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};

for (int i = 0; i < MESSAGE_COUNT; i++) {

 try {

   Message msg =

     new Message(TOPIC, tags[i % tags.length], "KEY" + i,

                 ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

   // 发送事务消息

   SendResult sendResult = producer.sendMessageInTransaction(msg, null);

   System.out.printf("%s%n", sendResult);


   Thread.sleep(10);

 } catch (MQClientException | UnsupportedEncodingException e) {

   e.printStackTrace();

 }

}


public class TransactionListenerImpl implements TransactionListener {

   private AtomicInteger transactionIndex = new AtomicInteger(0);


   private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();


   /**

    * 执行本地事务

    */

   @Override

   public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

       int value = transactionIndex.getAndIncrement();

       int status = value % 3;

       localTrans.put(msg.getTransactionId(), status);

       return LocalTransactionState.UNKNOW;

   }


   /**

    * 本地事务执行状态回查

    */

   @Override

   public LocalTransactionState checkLocalTransaction(MessageExt msg) {

       Integer status = localTrans.get(msg.getTransactionId());

       if (null != status) {

           switch (status) {

               case 0:

                   return LocalTransactionState.UNKNOW;

               case 1:

                   return LocalTransactionState.COMMIT_MESSAGE;

               case 2:

                   return LocalTransactionState.ROLLBACK_MESSAGE;

               default:

                   return LocalTransactionState.COMMIT_MESSAGE;

           }

       }

       return LocalTransactionState.COMMIT_MESSAGE;

   }

}


本地事务的执行状态,有三种结果:

  • LocalTransactionState.COMMIT_MESSAGE:事务执行成功,Broker会处理消息供下游消费
  • LocalTransactionState.ROLLBACK_MESSAGE:事务被回滚,Broker会删除消息,下游感知不到消息
  • LocalTransactionState.UNKNOW:事务的执行结果未知,比如事务还在执行中,稍后Broker会回重复回查,直到超过最大时间或者最大次数

二、原理解析

0、整体流程

  1. Producer发送事务消息
  2. Broker端SendMessageProcessor收到消息后,判断如果是一条事务消息,会将消息原来的topic和队列id存储到消息拓展中,设置新的topic为RMQ_SYS_TRANS_HALF_TOPIC然后 进行存储,然后通知Producer
  3. Producer收到Broker消息发送成功后,开始执行本地事务
  4. 本地事务执行完毕,Producer将事务执行状态通知Broker
  5. Broker端EndTransactionProcessor收到事务执行状态,从RMQ_SYS_TRANS_HALF_TOPIC中取出消息。如果事务执行成功,则从消息拓展中取出原本的topic和队列id,存储到真实的topic和队列id中,存储到RMQ_SYS_TRANS_OP_HALF_TOPIC主题中;如果是事务回滚,只把消息存储到RMQ_SYS_TRANS_OP_HALF_TOPIC主题中
  6. 如果Broker没有收到Producer事务执行状态的通知,Broker端TransactionalMessageCheckService会主动定时从RMQ_SYS_TRANS_HALF_TOPIC中捞取消息,判断是否有需要回查的消息
  7. 如果有需要回查的消息,Broker端TransactionalMessageCheckService会向Producer回查事务状态
  8. Producer执行TransactionListener的checkLocalTransaction方法,查询事务执行状态
  9. Producer查询本地事务状态之后再执行上述第4步和第5步

1、Producer发送消息

// 编写TransactionListener实现类用于执行本地事务和本地事务回查

TransactionListener transactionListener = new TransactionListenerImpl();

// 发送事务消息专用的TransactionMQProducer

TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);


producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);

// 新建一个线程池用于异步执行从Broker过来的事务回查 如果这里不新建 Broker也会自动创建一个

ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {

   Thread thread = new Thread(r);

   thread.setName("client-transaction-msg-check-thread");

   return thread;

});


producer.setExecutorService(executorService);

producer.setTransactionListener(transactionListener);

producer.start();


String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};

for (int i = 0; i < MESSAGE_COUNT; i++) {

   try {

       Message msg =

           new Message(TOPIC, tags[i % tags.length], "KEY" + i,

               ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

       // 发送事务消息

       SendResult sendResult = producer.sendMessageInTransaction(msg, null);

       System.out.printf("%s%n", sendResult);


       Thread.sleep(10);

   } catch (MQClientException | UnsupportedEncodingException e) {

       e.printStackTrace();

   }

}

// 发送事务消息

public TransactionSendResult sendMessageInTransaction(final Message msg,

   final LocalTransactionExecuter localTransactionExecuter, final Object arg)

   throws MQClientException {

   // 校验是否设置TransactionListener 发送事务消息必须要有TransactionListener

   TransactionListener transactionListener = getCheckListener();

   if (null == localTransactionExecuter && null == transactionListener) {

       throw new MQClientException("tranExecutor is null", null);

   }


   // ignore DelayTimeLevel parameter

   if (msg.getDelayTimeLevel() != 0) {

       // 事务消息不支持延迟发送

       MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);

   }


   Validators.checkMessage(msg, this.defaultMQProducer);


   SendResult sendResult = null;

   // 标记prepare消息 Broker根据这个判断是否是一条事务消息

   MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");

   // 设置消息生产者组 为了查询事务消息本地事务状态时 从该生产者组中随机选择一个消息生产者即可

   MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());

   try {

       sendResult = this.send(msg);

   } catch (Exception e) {

       throw new MQClientException("send message Exception", e);

   }


   // ......

   // 对发送结果的处理稍后解析

}

2、Broker接收事务消息

// asyncSendMessage方法

CompletableFuture<PutMessageResult> putMessageResult = null;

// 依据消息是否有MessageConst.PROPERTY_TRANSACTION_PREPARED判断是否事务消息

String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);

if (Boolean.parseBoolean(transFlag)) {

   // 处理事务消息

   if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {

       response.setCode(ResponseCode.NO_PERMISSION);

       response.setRemark(

               "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()

                       + "] sending transaction message is forbidden");

       return CompletableFuture.completedFuture(response);

   }

   // 保存事务消息

   putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);

} else {

   // 保存消息

   putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);

}

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {

   // 备份原本的topic和队列

   MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());

   MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,

       String.valueOf(msgInner.getQueueId()));

   msgInner.setSysFlag(

       MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));

   // 设置新的topic为RMQ_SYS_TRANS_HALF_TOPIC

msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());

   // 队列是写死的 只有一个 也就是说是顺序处理的

   msgInner.setQueueId(0);

   msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));

   return msgInner;

}

3、Producer处理发送消息结果

// sendMessageInTransaction方法

try {

   // 事务消息发送结果

   sendResult = this.send(msg);

} catch (Exception e) {

   throw new MQClientException("send message Exception", e);

}


LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;

Throwable localException = null;

// 处理事务消息发送结果

switch (sendResult.getSendStatus()) {

   // 发送成功的情况

   case SEND_OK: {

       try {

           if (sendResult.getTransactionId() != null) {

               msg.putUserProperty("__transactionId__", sendResult.getTransactionId());

           }

           String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);

           if (null != transactionId && !"".equals(transactionId)) {

               msg.setTransactionId(transactionId);

           }

           if (null != localTransactionExecuter) {

               // 这个已经废弃了 不会进入这里

               localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);

           } else if (transactionListener != null) {

               // 这里执行本地事务

               log.debug("Used new transaction API");

               localTransactionState = transactionListener.executeLocalTransaction(msg, arg);

           }

           if (null == localTransactionState) {

               localTransactionState = LocalTransactionState.UNKNOW;

           }


           if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {

               log.info("executeLocalTransactionBranch return {}", localTransactionState);

               log.info(msg.toString());

           }

       } catch (Throwable e) {

           // 有catch逻辑 是考虑到事务执行异常的场景

           log.info("executeLocalTransactionBranch exception", e);

           log.info(msg.toString());

           localException = e;

       }

   }

   break;

   case FLUSH_DISK_TIMEOUT:

   case FLUSH_SLAVE_TIMEOUT:

   case SLAVE_NOT_AVAILABLE:

       localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;

       break;

   default:

       break;

}

4、Producer通知Broker事务执行状态

try {

   // 事务消息收尾工作 通知Broker干活

   this.endTransaction(msg, sendResult, localTransactionState, localException);

} catch (Exception e) {

   log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);

}

public void endTransaction(

   final Message msg,

   final SendResult sendResult,

   final LocalTransactionState localTransactionState,

   final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {

   final MessageId id;

   if (sendResult.getOffsetMsgId() != null) {

       id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());

   } else {

       id = MessageDecoder.decodeMessageId(sendResult.getMsgId());

   }

   String transactionId = sendResult.getTransactionId();

   final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());

   EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();

   requestHeader.setTransactionId(transactionId);

   requestHeader.setCommitLogOffset(id.getOffset());

   // 设置本地事务执行状态

   switch (localTransactionState) {

       case COMMIT_MESSAGE:

           requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);

           break;

       case ROLLBACK_MESSAGE:

           requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);

           break;

       case UNKNOW:

           requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);

           break;

       default:

           break;

   }


   doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);

   requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());

   requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());

   requestHeader.setMsgId(sendResult.getMsgId());

   String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;

   // 发送消息给Broker

   this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,

       this.defaultMQProducer.getSendMsgTimeout());

}

public void endTransactionOneway(

   final String addr,

   final EndTransactionRequestHeader requestHeader,

   final String remark,

   final long timeoutMillis

) throws RemotingException, InterruptedException {

   // 指定Broker使用EndTransactionProcessor处理

   RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);


   request.setRemark(remark);

   // 单向消息 不考虑发送结果

   // 也就是说 是可能发送失败的 发送失败之后Broker会回查

   this.remotingClient.invokeOneway(addr, request, timeoutMillis);

}

5、Broker处理事务执行状态通知

// processRequest方法

// 上面的代理是Broker向Producer回查事务后的处理 稍后解析

else {

   // 发送半消息之后产生的调用

   switch (requestHeader.getCommitOrRollback()) {

       // 如果事务执行不是commit或者rollback 直接返回 不再进行下面的逻辑

       case MessageSysFlag.TRANSACTION_NOT_TYPE: {

           LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."

                   + "RequestHeader: {} Remark: {}",

               RemotingHelper.parseChannelRemoteAddr(ctx.channel()),

               requestHeader.toString(),

               request.getRemark());

           return null;

       }


       // 如果事务执行是commit 接着下面的处理

       case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {

           break;

       }

    // 如果事务执行是rollback 打印异常日志 接着下面的处理

       case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {

           LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."

                   + "RequestHeader: {} Remark: {}",

               RemotingHelper.parseChannelRemoteAddr(ctx.channel()),

               requestHeader.toString(),

               request.getRemark());

           break;

       }

       default:

           return null;

   }

}

OperationResult result = new OperationResult();

if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {

   // 根据偏移量 取出topic是RMQ_SYS_TRANS_HALF_TOPIC的消息

   // 第2步Broker保存消息之后 会把偏移量通知Producer Producer再传到这里

   result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);

   // 事务执行状态是commit

   if (result.getResponseCode() == ResponseCode.SUCCESS) {

       RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);

       if (res.getCode() == ResponseCode.SUCCESS) {

           // 从消息中取出原来的topic和队列等信息 构建真实消息

           MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());

           msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));

           msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());

           msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());

           msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());

           // 清除事务消息相关标记 防止循环处理

           MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);

           // 保存真实消息

           RemotingCommand sendResult = sendFinalMessage(msgInner);

           if (sendResult.getCode() == ResponseCode.SUCCESS) {

               // 删除消息RMQ_SYS_TRANS_HALF_TOPIC

               // 实际上是投递到RMQ_SYS_TRANS_OP_HALF_TOPIC中 并标记删除

               // 这里为什么还要投递到RMQ_SYS_TRANS_OP_HALF_TOPIC中 不直接删除呢 后面还需要根据这个判断是否是重复处理等

               this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());

           }

           return sendResult;

       }

       return res;

   }

} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {

   // 如果事务状态是rollback 删除消息RMQ_SYS_TRANS_HALF_TOPIC 投递到RMQ_SYS_TRANS_OP_HALF_TOPIC中并标记删除

   // 比commit少了一个投递真实主题的步骤

   result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);

   if (result.getResponseCode() == ResponseCode.SUCCESS) {

       RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);

       if (res.getCode() == ResponseCode.SUCCESS) {

           this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());

       }

       return res;

   }

}

6、Broker主动捞取消息

TransactionalMessageCheckService类实现Runnable接口,在Broker启动的时候,回调用BrokerController的start方法,在start方法中,会调用TransactionalMessageCheckService的start方法启动线程,run方法是一个死循环,默认每6秒执行一次

public void run() {

   log.info("Start transaction check service thread!");

   long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();

   while (!this.isStopped()) {

       this.waitForRunning(checkInterval);

   }

   log.info("End transaction check service thread!");

}

循环中实际执行的是这个方法

protected void onWaitEnd() {

   // 事务过期时间 只有当消息存储时间加上这个过期时间大于系统当前时间 才对消息执行事务回查 否则在下一次周期中执行事务回查操作

   long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();

   // 事务回查最大检测次数 如果超过最大检测次数还是无法获知消息的事务状态 不会再会回查 直接丢弃相当于回滚事务

   int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();

   long begin = System.currentTimeMillis();

   log.info("Begin to check prepare message, begin time:{}", begin);

   this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());

   log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);

}

public void check(long transactionTimeout, int transactionCheckMax,

       AbstractTransactionalMessageCheckListener listener) {

   try {

       // 获取事务半消息主题下的全部队列 然后依次处理

       String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;

       Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);

       if (msgQueues == null || msgQueues.size() == 0) {

           log.warn("The queue of topic is empty :" + topic);

           return;

       }

       log.debug("Check topic={}, queues={}", topic, msgQueues);

       for (MessageQueue messageQueue : msgQueues) {

           long startTime = System.currentTimeMillis();

           MessageQueue opQueue = getOpQueue(messageQueue);

           // RMQ_SYS_TRANS_HALF_TOPIC消息消费进度

           long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);

           // 收到事务消息提交或者回滚请求后的MQ_SYS_TRANS_OP_HALF_TOPIC中消息消费进度

           long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);

           log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);

           if (halfOffset < 0 || opOffset < 0) {

               log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,

                   halfOffset, opOffset);

               continue;

           }


           List<Long> doneOpOffset = new ArrayList<>();

           HashMap<Long, Long> removeMap = new HashMap<>();

           // 根据当前的处理进度 依次从已处理队列MQ_SYS_TRANS_OP_HALF_TOPIC拉取32条消息

           PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);

           if (null == pullResult) {

               log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",

                   messageQueue, halfOffset, opOffset);

               continue;

           }

           // single thread

           // 获取空消息的次数

           int getMessageNullCount = 1;

           // 当前处理半消息的进度

           long newOffset = halfOffset;

           // 当前处理消息的队列偏移量

           long i = halfOffset;

           while (true) {

               // 超过时常等待下次调度

               if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {

                   log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);

                   break;

               }

               // 如果该消息已被处理 继续处理下一条消息

               if (removeMap.containsKey(i)) {

                   log.debug("Half offset {} has been committed/rolled back", i);

                   Long removedOpOffset = removeMap.remove(i);

                   doneOpOffset.add(removedOpOffset);

               } else {

                   // 获取消息

                   GetResult getResult = getHalfMsg(messageQueue, i);

                   MessageExt msgExt = getResult.getMsg();

                   if (msgExt == null) {

                       // 超过重试次数 直接跳出 结束该消息队列的事务回查状态

                       if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {

                           break;

                       }

                       // 没有新的消息而返回 结束该消息队列的事务回查

                       if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {

                           log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,

                               messageQueue, getMessageNullCount, getResult.getPullResult());

                           break;

                       } else {

                           log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",

                               i, messageQueue, getMessageNullCount, getResult.getPullResult());

                           // 其它原因 重新拉取

                           i = getResult.getPullResult().getNextBeginOffset();

                           newOffset = i;

                           continue;

                       }

                   }


                   // needDiscard 如果该消息回查的次数超过允许回查的最大次数 该消息将被丢弃 事务消息提交失败 每回查一次 在消息属性中+1 默认回查最大次数为5

                   // needSkip 如果事务消息超过文件的过期时间 默认72小时 则跳过该消息

                   if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {

                       listener.resolveDiscardMsg(msgExt);

                       newOffset = i + 1;

                       i++;

                       continue;

                   }

                   if (msgExt.getStoreTimestamp() >= startTime) {

                       log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,

                           new Date(msgExt.getStoreTimestamp()));

                       break;

                   }


                   // 消息已存储时间 当前系统时间减去消息存储时间

                   long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();

                   // checkImmunityTime 立即检测事务消息的时间 设计的意义是 应用程序在发送事务消息后 事务不会马上提交 该时间就是假设事务消息发送成功后 应用程序事务提交时间 在这段时间内 RocketMQ任务事务未提交 不应该在这个时间段内向应用程序发送回查请求

                   // transactionTimeout 事务消息的超时时间 这个时间是从OP拉取消息的最后一条消息存储时间与check方法开始的时间 如果时间差超过了transactionTimeout 就算时间小于checkImmunityTime 也发送事务回查指令

                   long checkImmunityTime = transactionTimeout;

                   // PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS 消息事务消息回查的最晚时间 单位为秒 指的是程序发送事务消息 可以指定该事务消息的有效时间 只有在这个时间内收到回查消息才有效 默认为null

                   String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);

                   if (null != checkImmunityTimeStr) {

                       checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);

                       if (valueOfCurrentMinusBorn < checkImmunityTime) {

                           if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {

                               newOffset = i + 1;

                               i++;

                               continue;

                           }

                       }

                   } else {

                       // 如果当前时间没过应用程序事务结束时间 跳出本次处理

                       if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {

                           log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,

                               checkImmunityTime, new Date(msgExt.getBornTimestamp()));

                           break;

                       }

                   }

                   List<MessageExt> opMsg = pullResult.getMsgFoundList();

                   // 如果OP队列中没有已处理消息并且已经超过应用程序事务结束时间transactionTimeout

                   // 或者

                   // 操作队列不为空并且最后一条消息的存储时间已经超过transactionTimeout

                   boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)

                       || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))

                       || (valueOfCurrentMinusBorn <= -1);


                   if (isNeedCheck) {

                       // 这里回查是异步处理的 所以在回查之前 需要把消息重新投递到队列中 以便下次check

                       if (!putBackHalfMsgQueue(msgExt, i)) {

                           continue;

                       }

                       // 执行回查逻辑

                       listener.resolveHalfMsg(msgExt);

                   } else {

                       // 如果无法判断是否发送回查消息 则加载更多的已处理消息进行筛选

                       pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);

                       log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,

                           messageQueue, pullResult);

                       continue;

                   }

               }

               newOffset = i + 1;

               i++;

           }

           if (newOffset != halfOffset) {

               // 保存半消息的回查进度

               transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);

           }

           long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);

           if (newOpOffset != opOffset) {

               // 保存OP进度

               transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);

           }

       }

   } catch (Throwable e) {

       log.error("Check error", e);

   }


}

7、Broker主动回查事务状态

public void resolveHalfMsg(final MessageExt msgExt) {

   executorService.execute(new Runnable() {

       @Override

       public void run() {

           try {

               sendCheckMessage(msgExt);

           } catch (Exception e) {

               LOGGER.error("Send check message error!", e);

           }

       }

   });

}


public void sendCheckMessage(MessageExt msgExt) throws Exception {

   CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();

   checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());

   checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());

   checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));

   checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());

   checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());

   msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));

   msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));

   msgExt.setStoreSize(0);

   String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);

   // 从同一个生产者组中选择一个Producer进行回查

   // 所以同一个生产者组中如果部分机器出现宕机、发布重启等问题 也不会影响回查

   Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);

   if (channel != null) {

       brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);

   } else {

       LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);

   }

}


public void checkProducerTransactionState(

   final String group,

   final Channel channel,

   final CheckTransactionStateRequestHeader requestHeader,

   final MessageExt messageExt) throws Exception {

   // 给Producer发送消息时 指定类型是CHECK_TRANSACTION_STATE

   RemotingCommand request =

       RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);

   request.setBody(MessageDecoder.encode(messageExt, false));

   try {

       this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);

   } catch (Exception e) {

       log.error("Check transaction failed because invoke producer exception. group={}, msgId={}, error={}",

               group, messageExt.getMsgId(), e.toString());

   }

}

9、Producer本地事务回查

public RemotingCommand processRequest(ChannelHandlerContext ctx,

       RemotingCommand request) throws RemotingCommandException {

   switch (request.getCode()) {

       case RequestCode.CHECK_TRANSACTION_STATE:

           // 判断是Broker事务回查 检查本地事务执行状态

           return this.checkTransactionState(ctx, request);

       // ......省略部分代码

   }

   return null;

}

public void checkTransactionState(final String addr, final MessageExt msg,

   final CheckTransactionStateRequestHeader header) {

   Runnable request = new Runnable() {

       //...省略部分代码 下文解析

   };


   // 这里正是用新建TransactionMQProducer时创建的线程池异步执行提高效率

   this.checkExecutor.submit(request);

}

public void run() {

   TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();

   TransactionListener transactionListener = getCheckListener();

   if (transactionCheckListener != null || transactionListener != null) {

       LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;

       Throwable exception = null;

       try {

           if (transactionCheckListener != null) {

               localTransactionState = transactionCheckListener.checkLocalTransactionState(message);

           } else if (transactionListener != null) {

               // 检查本地事务

               log.debug("Used new check API in transaction message");

               localTransactionState = transactionListener.checkLocalTransaction(message);

           } else {

               log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);

           }

       } catch (Throwable e) {

           log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);

           exception = e;

       }


       // 将本地事务状态通知Broker

       // 和第四步Producer第一次尝试通知Broker一样 也是单向发送 可能发送失败

       this.processTransactionState(

           localTransactionState,

           group,

           exception);

   } else {

       log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);

   }

}

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 存储 数据库
深入学习RocketMQ的底层存储设计原理
文章深入探讨了RocketMQ的底层存储设计原理,分析了其如何通过将数据和索引映射到内存、异步刷新磁盘以及消息内容的混合存储来实现高性能的读写操作,从而保证了RocketMQ作为一款低延迟消息队列的读写性能。
|
2月前
|
消息中间件 存储 Kafka
RocketMQ 工作原理图解,看这篇就够了!
本文详细解析了 RocketMQ 的核心架构、消息领域模型、关键特性和应用场景,帮助深入理解消息中间件的工作原理。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
RocketMQ 工作原理图解,看这篇就够了!
|
3月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
2月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
3月前
|
消息中间件 监控 供应链
深度剖析 RocketMQ 事务消息!
本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。
193 2
|
5月前
|
消息中间件 负载均衡 API
RocketMQ生产者负载均衡(轮询机制)核心原理
文章深入分析了RocketMQ生产者的负载均衡机制,特别是轮询机制的实现原理,揭示了如何通过`ThreadLocal`技术和消息队列的选播策略来确保消息在多个队列之间均衡发送,以及如何通过灵活的API支持自定义负载均衡策略。
|
5月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
5月前
|
消息中间件 存储 RocketMQ
2分钟看懂RocketMQ延迟消息核心原理
本文从源码层面解析了RocketMQ延迟消息的实现原理,包括延迟消息的使用、Broker端处理机制以及定时任务对延迟消息的处理流程。
2分钟看懂RocketMQ延迟消息核心原理
|
5月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
6月前
|
消息中间件 存储 Kafka
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决