3 张图带你彻底理解 RocketMQ 事务消息

简介: 3 张图带你彻底理解 RocketMQ 事务消息

大家好,我是君哥。

事务消息是分布式事务的一种解决方案,RocketMQ 有成熟的事务消息模型,今天就来聊一聊 RocketMQ 事务消息实现机制。

假如有一个电商场景,用户下单后,账户服务从用户账户上扣减金额,然后通知库存服务给用户发货,这两个服务需要在一个分布式事务内完成。

这时,账户服务作为 Producer,库存服务作为 Consumer,见下面消息流程:

微信图片_20221213115117.png

  1. 账户服务作为 Producer 向 Broker 发送一条 half 消息;
  2. half 消息发送成功后,执行本地事务,执行成功则向 Broker 发送 commit 请求,否则发送 rollback 请求;
  3. 如果 Broker 收到的是 rollback 请求,则删除保存的 half 消息;
  4. 如果 Broker 收到的是 commit 请求,则保存扣减库存消息(这里的处理是把消息从 half 队列投递到真实的队列),然后删除保存的 half 消息
  5. 如果 Broker 没有收到请求,则会发送请求到 Producer 查询本地事务状态,然后根据 Producer 返回的本地状态做 commit/rollback 相关处理。

1 half 消息

上面电商的案例中,RocketMQ 解决分布式事务的第一步是账户服务发送 half 消息。

首先看官网一个发送事务消息的示例:

public static void main(String[] args) throws MQClientException, InterruptedException {
 TransactionListener transactionListener = new TransactionListenerImpl();
 TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
 ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
  @Override
  public Thread newThread(Runnable r) {
   Thread thread = new Thread(r);
   thread.setName("client-transaction-msg-check-thread");
   return thread;
  }
 });
 producer.setExecutorService(executorService);
 producer.setTransactionListener(transactionListener);
 producer.start();
 String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
 for (int i = 0; i < 10; i++) {
  try {
   Message msg =
    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
     ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
   SendResult sendResult = producer.sendMessageInTransaction(msg, null);
   System.out.printf("%s%n", sendResult);
   Thread.sleep(10);
  } catch (MQClientException | UnsupportedEncodingException e) {
   e.printStackTrace();
  }
 }
 for (int i = 0; i < 100000; i++) {
  Thread.sleep(1000);
 }
 producer.shutdown();
}

上面的代码中 Producer 有一个 TransactionListener 属性,这个由开发者通过实现这个接口来自己定义。这个接口有两个方法:

  • 提交本地事务 executeLocalTransaction
  • 检查本地事务状态 checkLocalTransaction

下面代码是发送事务消息的方法:

//类 DefaultMQProducerImpl
public TransactionSendResult sendMessageInTransaction(final Message msg,
 final LocalTransactionExecuter localTransactionExecuter, final Object arg)
 throws MQClientException {
 TransactionListener transactionListener = getCheckListener();
 //省略验证逻辑
 SendResult sendResult = null;
 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
 try {
  sendResult = this.send(msg);
 } catch (Exception e) {
  throw new MQClientException("send message Exception", e);
 }
 LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
 //省略发送结果处理
 try {
  this.endTransaction(msg, sendResult, localTransactionState, localException);
 } catch (Exception e) {
 }
 TransactionSendResult transactionSendResult = new TransactionSendResult();
 //省略封装属性
 return transactionSendResult;
}

从这段代码中看到,在发送消息前,给消息封装了一个属性PROPERTY_TRANSACTION_PREPARED,通过这个属性可以找到 Broker 端的处理。

Broker 保存 half 消息时,把消息 topic 改为 RMQ_SYS_TRANS_HALF_TOPIC,然后把消息投递到 queueId 等于 0 的队列。投递成功后给 Producer 返回 PutMessageStatus.PUT_OK。代码如下:

public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
 return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
  String.valueOf(msgInner.getQueueId()));
 msgInner.setSysFlag(
  MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
 msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
 msgInner.setQueueId(0);
 msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
 return msgInner;
}

2 执行本地事务

上一节讲到,Producer 发送事务消息时,会给一个 transactionListener,发送 half 消息成功后,会通过 transactionListener 的 executeLocalTransactionBranch 提交本地事务,代码如下:

//类 DefaultMQProducerImpl
public TransactionSendResult sendMessageInTransaction(final Message msg,
 final LocalTransactionExecuter localTransactionExecuter, final Object arg)
 throws MQClientException {
 //省略部分代码
 SendResult sendResult = null;
 //省略部分代码
 try {
  sendResult = this.send(msg);
 } catch (Exception e) {
  throw new MQClientException("send message Exception", e);
 }
 LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
 Throwable localException = null;
 switch (sendResult.getSendStatus()) {
  case SEND_OK: {
   try {
    //省略部分代码
    if (null != localTransactionExecuter) {
        //这个分支已经废弃
     localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
    } else if (transactionListener != null) {
     //执行本地事务
     localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
    }
    //省略部分代码
   } catch (Throwable e) {
   }
  }
  break;
  //省略其他 case
 }
 try {
  this.endTransaction(msg, sendResult, localTransactionState, localException);
 } catch (Exception e) {
  log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
 }
    //省略部分代码
 return transactionSendResult;
}

从上面代码中可以看到,本地事务执行结束后,会调用一个 endTransaction 方法,这个就是向 Broker 发送 commit/rollback,也可能发送 UNKNOW,封装到 requestHeader 的 commitOrRollback 的属性中。这个请求的请求码是 END_TRANSACTION。

3 commit/rollback 处理

根据请求码 END_TRANSACTION 可以找到 Broker 端对事务消息的处理。代码如下:

//EndTransactionProcessor 类
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
 RemotingCommandException {
 //省略部分逻辑
 OperationResult result = new OperationResult();
 if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
     //查找出 half 消息
  result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
  if (result.getResponseCode() == ResponseCode.SUCCESS) {
      //检查 groupId 和消息偏移量是否合法
   RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
   if (res.getCode() == ResponseCode.SUCCESS) {
    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
    //删除PROPERTY_TRANSACTION_PREPARED属性
    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
    RemotingCommand sendResult = sendFinalMessage(msgInner);
    if (sendResult.getCode() == ResponseCode.SUCCESS) {
        //删除 half 消息
     this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
    }
    return sendResult;
   }
   return res;
  }
 } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
  result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
  if (result.getResponseCode() == ResponseCode.SUCCESS) {
   RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
   if (res.getCode() == ResponseCode.SUCCESS) {
    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
   }
   return res;
  }
 }
 response.setCode(result.getResponseCode());
 response.setRemark(result.getResponseRemark());
 return response;
}

这段代码逻辑很清晰,首先查找出 half 消息,然后对查找出的消息进行检查(groupId 和消息偏移量是否合法),如果是 commit,则去除事务消息准备阶段属性,重新把消息投递到原始队列,然后删除 half 消息。如果是 rollback,则直接删除 half 消息。

注意:对于 UNKNOW 的类型,这里直接返回 null,上面代码没有贴出来。

4 check 事务状态

Broker 初始化的时候,会初始化一个 TransactionalMessageServiceImpl 线程,这个线程会定时检查过期的消息,通过向 Producer 发送 check 消息来获取事务状态。代码如下:

//TransactionalMessageCheckService
protected void onWaitEnd() {
    //超时时间,默认 6s
 long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
 //最大检查次数,默认 15
 int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
 long begin = System.currentTimeMillis();
 log.info("Begin to check prepare message, begin time:{}", begin);
 this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
 log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

这里有两个参数需要注意:

  • 事务消息超时时间,超时后会向 Producer 发送 check 消息检查本地事务状态,默认 6s;
  • 最大检查次数,Broker 每次向 Producer 发送 check 消息后检查次数加 1,超过最大检查次数后 half 消息被丢弃,默认最大检查次数是 15;注意:这里的丢弃是把消息写入了一个新的队列,Topic 为 TRANS_CHECK_MAX_TIME_TOPIC,queueId 为 0。
  • 文件保存时间,默认72 小时。

检查事务消息的流程如下:

微信图片_20221213115153.png


Producer 收到 check 消息后,最终调用 TransactionListener 中定义的 checkLocalTransaction 方法,查询本地事务执行状态,然后发送给 Broker。

需要注意的是,check 消息发送给 Broker 时,会在请求 Header 中给 fromTransactionCheck 属性赋值为 true,以标记是 check 消息。

Broker 收到 check 响应消息后,处理逻辑跟第 3 节的处理逻辑一样,唯一不同的是,这里针对 check 消息和非 check 消息打印了不同的日志。

5 总结

从上面代码的分析可以看到,RocketMQ 的事务消息实现机制非常简洁。使用事务消息时自己定义 TransactionListener,实现执行本地事务 executeLocalTransaction 和检查本地事务状态 checkLocalTransaction 这两个方法,然后使用 TransactionMQProducer 进行发送。

最后,附一张 Producer 端的 UML 类图:

微信图片_20221213115216.png

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
2月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
2月前
|
消息中间件 监控 供应链
深度剖析 RocketMQ 事务消息!
本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。
150 2
|
7月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
4月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
5月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
393 1
|
5月前
|
消息中间件 调度 RocketMQ
【RocketMQ系列六】RocketMQ事务消息
【RocketMQ系列六】RocketMQ事务消息
1005 1
|
6月前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程
|
6月前
|
消息中间件 网络性能优化 RocketMQ
消息队列 MQ产品使用合集之本地事务还没有执行完就触发了回查是什么导致的
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
7月前
|
消息中间件 存储 Apache
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
655 2
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景