1、RocketMQ事务消息
1.1、应用场景
分布式事务的诉求
- 分布式系统调用的特点为一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理。因此,如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题。
- 以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:
- 主分支订单系统状态更新:由未支付变更为支付成功。
- 物流系统状态新增:新增待发货物流记录,创建订单物流记录。
- 积分系统状态变更:变更用户积分,更新用户积分表。
- 购物车系统状态变更:清空购物车,更新用户购物车记录。
- 传统XA事务方案:性能不足
- 为了保证上述四个分支的执行结果一致性,典型方案是基于XA协议的分布式事务系统来实现。将四个调用分支封装成包含四个独立事务分支的大事务。基于XA分布式事务的方案可以满足业务处理结果的正确性,但最大的缺点是多分支环境下资源锁定范围大,并发度低,随着下游分支的增加,系统性能会越来越差。
- 基于普通消息方案:一致性保障困难
- 将上述基于XA事务的方案进行简化,将订单系统变更作为本地事务,剩下的系统变更作为普通消息的下游来执行,事务分支简化成普通消息+订单表事务,充分利用消息异步化的能力缩短链路,提高并发度。
- 该方案中消息下游分支和订单系统变更的主分支很容易出现不一致的现象,例如:
- 消息发送成功,订单没有执行成功,需要回滚整个事务。
- 可以在执行订单成功后,然后发消息呀,
- 懂了,两边都成功,或者都失败,如果先执行订单创建逻辑,不能确保都执行成功。
- 订单执行成功,消息没有发送成功,需要额外补偿才能发现不一致。
- 消息发送超时未知,此时无法判断需要回滚订单还是提交订单变更。
- 的确存在这个问题
- 基于RocketMQ分布式事务消息:支持最终一致性
- 上述普通消息方案中,普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。
- 而基于RocketMQ实现的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。
- RocketMQ事务消息的方案,具备高性能、可扩展、业务开发简单的优势
- 系统性能高
- 基于最终一致性的事务消息方案,相比传统XA事务,吞吐性能更高,可扩展性更强。
- 开发成本低
基于事务消息开发逻辑简单,仅需两阶段接口即可完成多个事务分支的协调,无需业务做补偿处理。
下图以创建订单为例对比传统事务和消息队列RocketMQ版事务消息的方案:
2、RocketMQ事务消息原理
2.1、什么是事务消息
- 事务消息是RocketMQ提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。
- RocketMQ 在 4.3 版本之后实现了完整的事务消息,基于MQ的分布式事务方案,本质上是对本地消息表的一个封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存在了MQ内部,而不是业务数据库,事务消息解决的是生产端的消息发送与本地事务执行的原子性问题,这里的界限一定要清楚,是确保 MQ 生产端正确无误地将消息发送出来,没有多发,也不会漏发,至于发送后消费端有没有正常的消费消息,这种异常场景将由 MQ 消息消费失败重试机制来保证。
- RocketMQ 设计中的 broker 与 producer 端的双向通信能力,使得 broker 天生可以作为一个事务协调者;而 RocketMQ 本身提供的存储机制则为事务消息提供了持久化能力;RocketMQ 的高可用机制以及可靠消息设计则为事务消息在系统发生异常时依然能够保证达成事务的最终一致性。
2.2、事务消息处理流程
事务消息交互流程如下图所示。
过程如下:
- 生产者将消息发送至RocketMQ服务端。
- 消息队列RocketMQ版服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为“暂不能投递”,这种状态下的消息即为半事务消息。
- 生产者开始执行本地事务逻辑。
- 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
- 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
- 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
- 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
说明 服务端回查的间隔时间和最大回查次数,请参见参数限制。 - 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
备注:本地事务的回滚依赖于本地DB的ACID特性,订阅方的成功消费由 MQ Server 的失败重试机制进行保证。
2.3、事务消息生命周期
- 如下图所示
- 初始化
- 半事务消息被生产者构建并完成初始化,待发送到服务端的状态。
- 事务待提交
- 半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。
- 消息回滚
- 第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。
- 提交待消费
- 第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。
- 消费中
- 消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。
- 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ会对消息进行重试处理。具体信息,请参见消费重试。
- 消费提交
- 消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。
- RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。(3天时间)
- 消息删除
- 消息队列RocketMQ版按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制。
2.4、RocketMQ事务消息的实现流程
以 RocketMQ 4.5.2 版本为例,事务消息有专门的一个队列 RMQ_SYS_TRANS_HALF_TOPIC
,所有的 prepare 消息都先往这里放,当消息收到 Commit 请求后,就将消息转移到真实的 Topic 队列里,供 Consumer 消费,同时向 RMQ_SYS_TRANS_OP_HALF_TOPIC
塞一条消息。简易流程图如下:
- 当应用模块的事务因为中断或者其他的网络原因导致无法立即响应的,RocketMQ 会当做 UNKNOW 处理,对此 RocketMQ 事务消息提供了一个补救方案:定时回查事务消息的事务执行状态,简易流程图如下:
2.5、使用限制
1、消息类型一致性
- 事务消息仅支持在MessageType为Transaction的主题内使用,即事务消息只能发送至类型为事务消息的主题中,发送的消息的类型必须和主题的类型一致。
2、消费事务性
- RocketMQ事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功。
3、中间状态可见性
- RocketMQ事务消息为最终一致性,即在消息提交到下游消费端处理完成之前,下游分支和上游事务之间的状态会不一致。因此,事务消息仅适合接受异步执行的事务场景。
4、事务超时机制
- RocketMQ事务消息的生命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。事务超时时间,请参见参数限制。
2.6、使用示例
事务消息相比普通消息发送时需要修改以下几点:
- 发送事务消息前,需要开启事务并关联本地的事务执行。
- 为保证事务一致性,在构建生产者时,必须设置事务检查器和预绑定事务消息发送的主题列表,客户端内置的事务检查器会对绑定的事务主题做异常状态恢复。
- 以Java语言为例,使用事务消息示例参考如下:
//演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。 private static boolean checkOrderById(String orderId) { return true; } //演示demo,模拟本地事务的执行结果。 private static boolean doLocalTransaction() { return true; } public static void main(String[] args) throws ClientException { ClientServiceProvider provider = new ClientServiceProvider(); MessageBuilder messageBuilder = new MessageBuilder(); //构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。 Producer producer = provider.newProducerBuilder() .setTransactionChecker(messageView -> { /** * 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。 * 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。 */ final String orderId = messageView.getProperties().get("OrderId"); if (Strings.isNullOrEmpty(orderId)) { // 错误的消息,直接返回Rollback。 return TransactionResolution.ROLLBACK; } return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK; }) .build(); //开启事务分支。 final Transaction transaction; try { transaction = producer.beginTransaction(); } catch (ClientException e) { e.printStackTrace(); //事务分支开启失败,直接退出。 return; } Message message = messageBuilder.setTopic("topic") //设置消息索引键,可根据关键字精确查找某条消息。 .setKeys("messageKey") //设置消息Tag,用于消费端根据指定Tag过滤消息。 .setTag("messageTag") //一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。 .addProperty("OrderId", "xxx") //消息体。 .setBody("messageBody".getBytes()) .build(); //发送半事务消息 final SendReceipt sendReceipt; try { sendReceipt = producer.send(message, transaction); } catch (ClientException e) { //半事务消息发送失败,事务可以直接退出并回滚。 return; } /** * 执行本地事务,并确定本地事务结果。 * 1. 如果本地事务提交成功,则提交消息事务。 * 2. 如果本地事务提交失败,则回滚消息事务。 * 3. 如果本地事务未知异常,则不处理,等待事务消息回查。 */ boolean localTransactionOk = doLocalTransaction(); if (localTransactionOk) { try { transaction.commit(); } catch (ClientException e) { // 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。 e.printStackTrace(); } } else { try { transaction.rollback(); } catch (ClientException e) { // 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。 e.printStackTrace(); } } }
2.7、使用建议
1、避免大量未决事务导致超时
- RocketMQ支持在事务提交阶段异常的情况下发起事务回查,保证事务一致性。但生产者应该尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能受损,容易导致事务处理延迟。
2、正确处理“进行中”的事务
- 消息回查时,对于正在进行中的事务不要返回Rollback或Commit结果,应继续保持Unknown的状态。
- 一般出现消息回查时事务正在处理的原因为:事务执行较慢,消息回查太快。解决方案如下:
- 将第一次事务回查时间设置较大一些,但可能导致依赖回查的事务提交延迟较大。
- 程序能正确识别正在进行中的事务。
Action1:发送事务消息为什么必须要实现回查 Check 机制?
- 当步骤(1)中 Half 消息发送完成,但本地事务返回状态为
TransactionStatus.Unknow
,或者应用退出导致本地事务未提交任何状态时,从 MQ Broker 的角度看,这条 Half 状态的消息的状态是未知的。 因此 MQ Broker 会定期要求发送方能 Check 该 Half 状态消息,并上报其最终状态。
Action2:Check 被回调时,业务逻辑都需要做些什么?
MQ 事务消息的 check 方法里面,应该写一些检查事务一致性的逻辑。 MQ 发送事务消息时需要实现 LocalTransactionChecker
接口,用来处理 MQ Broker 主动发起的本地事务状态回查请求;因此在事务消息的 Check 方法中,需要完成两件事情:
- (1) 检查该 Half 消息对应的本地事务的状态(commited or rollback);
- (2) 向 MQ Broker 提交该 Half 消息本地事务的状态。
3、Springboot 整合 RocketMQ 实现事务消息
该部分将从 “下订单 + 扣减库存” 的案例来介绍 SpringBoot 如何整合 RocketMQ 并使用事务消息保证最终一致性。核心思路是订单服务(生产端)向 RocketMQ 发送库存扣减消息,再执行本地订单生成逻辑,最后交由 RocketMQ 通知 库存服务 扣减库存并保证库存扣减消息被正常消费。
案例中使用到的服务分为两个,订单服务和库存服务;涉及到的数据库表主要有三个,订单表、存储表,本地事务状态表。由于这几个表都比较简单,这里就不将对应的建表语句粘贴出来了,同样对应的 Pojo对象、Dao层、Service层 代码也不粘贴出来了,下面只展示核心逻辑的代码。
3.1、启动 RocketMQ 服务端:
RocketMQ的安装与部署请参考这篇文章:https://blog.csdn.net/a745233700/article/details/122531859
3.2、在父pom文件中引入依赖:
<!-- rocketmq 事务消息 --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version> </dependency>
3.3、生产端代码:
- 生产端的核心逻辑就是向 RocketMQ 投递事务消息,并执行本地事务,最后将本地事务的执行结果通知到 RocketMQ
(1)RocketMQ相关配置:
在 application.properties 配置文件中添加以下配置:
rocketmq.name-server=172.28.190.101:9876 rocketmq.producer.group=order_shop
(2)创建一个监听类:
- 实现 TransactionListener 接口,在实现的数据库事务提交方法
executeLocalTransaction()
和回查事务状态方法checkLocalTransaction()
中模拟结果
/** * rocketmq 事务消息回调类 */ @Slf4j @Component public class OrderTransactionListener implements TransactionListener { @Resource private ShopOrderMapper shopOrderMapper; /** * half消息发送成功后回调此方法,执行本地事务 * * @param message 回传的消息,利用transactionId即可获取到该消息的唯一Id * @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到 * @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调 */ @Override @Transactional public LocalTransactionState executeLocalTransaction(Message message, Object arg) { log.info("开始执行本地事务:订单信息:" + new String(message.getBody())); String msgKey = new String(message.getBody()); ShopOrderPojo shopOrder = JSONObject.parseObject(msgKey, ShopOrderPojo.class); int saveResult; LocalTransactionState state; try { //修改为true时,模拟本地事务异常 boolean imitateException = true; if(imitateException) { throw new RuntimeException("更新本地事务时抛出异常"); } // 生成订单,本地事务的回滚依赖于DB的ACID特性,所以需要添加Transactional注解。当本地事务提交失败时,返回ROLLBACK_MESSAGE,则会回滚rocketMQ中的half message,保证分布式事务的一致性。 saveResult = shopOrderMapper.insert(shopOrder); state = saveResult == 1 ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE; // 更新本地事务并将事务号持久化,为后续的幂等做准备 // TransactionDao.add(transactionId) } catch (Exception e){ log.error("本地事务执行异常,异常信息:", e); state = LocalTransactionState.ROLLBACK_MESSAGE; } //修改为true时,模拟本地事务超时,对于超时的消息,rocketmq会调用 checkLocalTransaction 方法回查本地事务执行状况 boolean imitateTimeout = false; if(imitateTimeout){ state = LocalTransactionState.UNKNOW; } log.info("本地事务执行结果:msgKey=" + msgKey + ",execute state:" + state); return state; } /** * 回查本地事务接口 * * @param messageExt 通过获取 transactionId 来判断这条消息的本地事务执行状态 * @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt){ log.info("调用回查本地事务接口:msgKey=" + new String(messageExt.getBody())); String msgKey = new String(messageExt.getBody()); ShopOrderPojo shopOrder = JSONObject.parseObject(msgKey, ShopOrderPojo.class); // 备注:此处应使用唯一ID查询本地事务是否执行成功,唯一ID可以使用事务的 transactionId。但为了验证方便,只查询DB的订单表是否存在对应的记录 // TransactionDao.isExistTx(transactionId) List<ShopOrderPojo> list = shopOrderMapper.selectList(new QueryWrapper<ShopOrderPojo>() .eq("shop_id", shopOrder.getShopId()) .eq("user_id", shopOrder.getUserId())); LocalTransactionState state = list.size() > 0 ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE; log.info("调用回查本地事务接口的执行结果:" + state); return state; } }
- 为了方便验证,上面 Demo 使用了两个 boolean 变量 imitateException、imitateTimeout 分别模拟了事务执行异常和超时的情况,只需要将布尔值设置为 true 即可。
(3)投递事务消息:
import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; @Slf4j @Service public class ShopOrderServiceImpl extends ServiceImpl<ShopOrderMapper, ShopOrderPojo> implements ShopOrderService { @Resource private RocketMQTemplate rocketMQTemplate; @Autowired private OrderTransactionListener orderTransactionListener; /** * 发送事务消息 */ @Override public boolean sendOrderRocketMqMsg(ShopOrderPojo shopOrderPojo) { String topic = "storage"; String tag = "reduce"; // 设置监听器,此处如果使用MQ其他版本,可能导致强转异常 ((TransactionMQProducer) rocketMQTemplate.getProducer()).setTransactionListener(orderTransactionListener); //构建消息体 String msg = JSONObject.toJSONString(shopOrderPojo); org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(msg).build(); //发送事务消息,由消费者进行进行减少库存 TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(topic + ":" + tag , message, null); log.info("Send transaction msg result: " + sendResult); return sendResult.getSendStatus() == SendStatus.SEND_OK; } }
3.4、消费端代码:
消费端的核心逻辑就是监听 MQ,接收消息;接收到消息之后扣减库存
(1)RocketMQ相关配置:
在 application.properties 配置文件中添加以下配置:
rocketmq.name-server=172.28.190.101:9876 rocketmq.consumer.group=order_shop
(2)消费监听类:
import com.alibaba.fastjson.JSONObject; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * 库存管理消费者类 **/ @Component @RocketMQMessageListener (consumerGroup = "order_storage", topic = "storage") public class StorageConsumerListener implements RocketMQListener<String> { @Resource private TStorageService tStorageService; /** * rocketMQ消费者 */ @Override public void onMessage(String message) { System.out.println("消费者开始消费:从MQ中获取的消息是:" + message); ShopOrderPojo shopOrder = JSONObject.parseObject(message, ShopOrderPojo.class); // 1、幂等校验,防止消息重复消费--此处省略相关的代码逻辑: // TransactionDao.isExistTx(transactionId) // 2、执行消息消费操作--减少商品库存: TStoragePojo shop = tStorageService.getById(shopOrder.getShopId()); shop.setNum(shop.getNum() - 1); boolean updateResult = tStorageService.updateById(shop); // 3、添加事务操作记录--此次省略代码: // TransactionDao.add(transactionId) System.out.println("消费者完成消费:操作结果:" + updateResult); } }
至此,一个完整的基于 RocketMQ 事务消息实现的分布式事务的最终一致性就完成了。
Action1:政采云对事务消息的使用
- 供应商入驻如商品店铺的创建、默认仓库的创建