解析 RocketMQ 业务消息——“事务消息”

本文涉及的产品
云原生网关 MSE Higress,422元/月
容器镜像服务 ACR,镜像仓库100个 不限时长
函数计算FC,每月15万CU 3个月
简介: 本篇文章通过拆解 RocketMQ 事务消息的使用场景、基本原理、实现细节和实战使用,帮助大家更好的理解和使用 RocketMQ 的事务消息。

作者:合伯


引言:在分布式系统调用场景中存在这样一个通用问题,即在执行一个核心业务逻辑的同时,还需要调用多个下游做业务处理,而且要求多个下游业务和当前核心业务必须同时成功或者同时失败,进而避免部分成功和失败的不一致情况出现。简单来说,消息队列中的“事务”,主要解决的是消息生产者和消费者的数据一致性问题。本篇文章通过拆解 RocketMQ 事务消息的使用场景、基本原理、实现细节和实战使用,帮助大家更好的理解和使用 RocketMQ 的事务消息。


点击下方链接,查看视频讲解:

https://yqh.aliyun.com/live/detail/29199


场景:为什么需要事务消息


以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:


  • 主分支订单系统状态更新:由未支付变更为支付成功;
  • 物流系统状态新增:新增待发货物流记录,创建订单物流记录;
  • 积分系统状态变更:变更用户积分,更新用户积分表;
  • 购物车系统状态变更:清空购物车,更新用户购物车记录。 


1.png


分布式系统调用的特点是:一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理。因此,如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题。


传统 XA 事务方案:性能不足


为了保证上述四个分支的执行结果一致性,典型方案是基于XA协议的分布式事务系统来实现。将四个调用分支封装成包含四个独立事务分支的大事务,基于XA分布式事务的方案可以满足业务处理结果的正确性,但最大的缺点是多分支环境下资源锁定范围大,并发度低,随着下游分支的增加,系统性能会越来越差。


基于普通消息方案:一致性保障困难


将上述基于 XA 事务的方案进行简化,将订单系统变更作为本地事务,剩下的系统变更作为普通消息的下游来执行,事务分支简化成普通消息+订单表事务,充分利用消息异步化的能力缩短链路,提高并发度。


2.png

image.gif

该方案中消息下游分支和订单系统变更的主分支很容易出现不一致的现象,例如:


  • 消息发送成功,订单没有执行成功,需要回滚整个事务;
  • 订单执行成功,消息没有发送成功,需要额外补偿才能发现不一致;
  • 消息发送超时未知,此时无法判断需要回滚订单还是提交订单变更。


基于RocketMQ分布式事务消息:支持最终一致性


上述普通消息方案中,普通消息和订单事务无法保证一致的本质原因是普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。


而基于消息队列 RocketMQ 版实现的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。image.gif


3.png


消息队列 RocketMQ 版事务消息的方案,具备高性能、可扩展、业务开发简单的优势。


基本原理


概念介绍


  • 事务消息:RocketMQ 提供类似 XA 或 Open XA 的分布式事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致; 


  • 半事务消息:暂不能投递的消息,生产者已经成功地将消息发送到了 RocketMQ 服务端,但是 RocketMQ 服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息; 


  • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ 服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。


事务消息生命周期


4.png


  • 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态; 


  • 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见; 


  • 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止; 


  • 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费; 


  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ 会对消息进行重试处理。具体信息,请参见消息重试; 


  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败);RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。 


  • 消息删除:当消息存储时长到期或存储空间不足时,RocketMQ 会按照滚动机制清理最早保存的消息数据,将消息从物理文件中删除。


事务消息基本流程


事务消息交互流程如下图所示:


5.png

image.gif

  1. 生产者将消息发送至 RocketMQ 服务端; 


  1. RocketMQ 服务端将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息被标记为“暂不能投递”,这种状态下的消息即为半事务消息; 


  1. 生产者开始执行本地事务逻辑; 


  1. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit 或是 Rollback),服务端收到确认结果后处理逻辑如下:
  • 二次确认结果为 Commit:服务端将半事务消息标记为可投递,并投递给消费者;
  • 二次确认结果为 Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。 


  1. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查;

 

  1. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果; 


  1. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行处理。


实现细节:RocketMQ 事务消息如何实现


6.png


根据发送事务消息的基本流程的需要,实现分为三个主要流程:接收处理 Half 消息、Commit 或 Rollback 命令处理、事务消息 check。


处理 Half 消息


发送方第一阶段发送 Half 消息到 Broker 后,Broker 处理 Half 消息。Broker 流程参考下图:


7.jpeg


具体流程是首先把消息转换 Topic 为 RMQ_SYS_TRANS_HALF_TOPIC,其余消息内容不变,写入 Half 队列。具体实现参考 SendMessageProcessor 的逻辑处理。


Commit 或 Rollback 命令处理

发送方完成本地事务后,继续发送 Commit 或 Rollback 到 Broker。由于当前事务已经完结,Broker 需要删除原有的 Half 消息,由于 RocketMQ 的 appendOnly 特性,Broker通过 OP 消息实现标记删除。Broker 流程参考下图:image.gif


8.jpeg


  • Commit。Broker 写入 OP 消息,OP 消息的 body 指定 Commit 消息的 queueOffset,标记之前 Half 消息已被删除;同时,Broker 读取原 Half 消息,把 Topic 还原,重新写入 CommitLog,消费者则可以拉取消费; 


  • Rollback。Broker 同样写入 OP 消息,流程和 Commit 一样。但后续不会读取和还原 Half 消息。这样消费者就不会消费到该消息。 


具体实现在 EndTransactionProcessor 中。


事务消息 check


如果发送端事务时间执行过程,发送 UNKNOWN 命令,或者 Broker/发送端重启发布等原因,流程 2 的标记删除的 OP 消息可能会缺失,因此增加了事务消息 check 流程,该流程是在异步线程定期执行(transactionCheckInterval 默认 30s 间隔),针对这些缺失 OP 消息的 Half 消息进行 check 状态。具体参考下图:


9.jpeg


image.gif事务消息 check 流程扫描当前的 OP 消息队列,读取已经被标记删除的 Half 消息的 queueOffset。如果发现某个 Half 消息没有 OP 消息对应标记,并且已经超时(transactionTimeOut 默认 6 秒),则读取该 Half 消息重新写入 half 队列,并且发送 check 命令到原发送方检查事务状态;如果没有超时,则会等待后读取 OP 消息队列,获取新的 OP 消息。


另外,为了避免发送方的异常导致长期无法确定事务状态,如果某个 Half 消息的 bornTime 超过最大保留时间(transactionCheckMaxTimeInMs 默认 12 小时),则会自动跳过此消息,不再 check。


具体实现参考:


TransactionalMessageServiceImpl#check 方法。


实战:使用事务消息


了解了 RocketMQ 事务消息的原理后,我们看下如何使用事务。首先,我们需要创建一个 “事务消息” 类型的 Topic,可以使用控制台或者 CLi 命令创建。


10.png


事务消息相比普通消息发送时需要修改以下几点:


  • 发送事务消息前,需要开启事务并关联本地的事务执行。
  • 为保证事务一致性,在构建生产者时,必须设置事务检查器和预绑定事务消息发送的主题列表,客户端内置的事务检查器会对绑定的事务主题做异常状态恢复。 


11.png


当事务消息 commit 之后,这条消息其实就是一条投递到用户 Topic 的普通消息而已。所以对于消费者来说,和普通消息的消费没有区别。


12.png


注意:


  1. 避免大量未决事务导致超时:在事务提交阶段异常的情况下发起事务回查,保证事务一致性;但生产者应该尽量避免本地事务返回未知结果;大量的事务检查会导致系统性能受损,容易导致事务处理延迟;
  2. 事务消息的 Group ID 不能与其他类型消息的 Group ID 共用:与其他类型的消息不同,事务消息有回查机制,回查时服务端会根据 Group ID 去查询生产者客户端;
  3. 事务超时机制:半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。 


今天通过对 RocketMQ 事务消息的介绍,希望能够帮大家对事务消息的原理和应用有更深入的了解,同时也期望 RocketMQ 的事务消息能够帮助您更有效的解决业务问题。如果您对 RocktMQ 的业务消息感兴趣,也欢迎您扫描下方二维码加入钉钉群一起沟通交流~


13.png


点击此处,进入官网了解更多详情~

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 Java Apache
RocketMQ消息回溯实践与解析
在分布式系统和高并发应用的开发中,消息队列扮演着至关重要的角色,而RocketMQ作为阿里巴巴开源的一款高性能消息中间件,以其高吞吐量、高可用性和灵活的配置能力,在业界得到了广泛应用。本文将围绕RocketMQ的消息回溯功能进行实践与解析,分享工作学习中的技术干货。
83 3
|
23天前
|
消息中间件 存储 Java
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
38 3
|
2月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
2月前
|
消息中间件 存储 监控
RocketMQ消息重试机制解析!
RocketMQ消息重试机制解析!
RocketMQ消息重试机制解析!
|
2月前
|
消息中间件 监控 供应链
深度剖析 RocketMQ 事务消息!
本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。
137 2
|
4月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
79 2
|
4月前
|
存储 SQL 关系型数据库
深入解析MySQL事务机制和锁机制
深入解析MySQL事务机制和锁机制
|
4月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
4月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
71 0
|
5月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
365 1

相关产品

  • 云消息队列 MQ
  • 推荐镜像

    更多