【视频】事务消息| 学习笔记

简介: 快速学习【视频】事务消息

开发者学堂课程【消息队列 RocketMQ 消息集成【视频】事务消息】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/1189/detail/18100


【视频】事务消息

 

内容介绍:

一、背景介绍

二、基本原理

三、具体实现

四、使用方法

 

一、背景介绍


接着上一期 RocketMQ 定时消息的分享,这期分享 RocketMQ 的事务消息。从以下四个方面来展开。首先来分析一下为什么需要事务消息。接着对事务消息的概念和基本流程进行了解,然后来一起看一下 RocketMQ 的事务消息到底是如何实现的。最后,来体验如何使用事务消息。首先想一下为什么需要事务消息。

image.png

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。因此对分布式事务有强烈的诉求。分布式系统调用的特点是,一个核心业务逻辑的执行,需要同时调用多个下游业务进行处理。因此,如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题。典型方案是基于XA协议的分布式事务系统来实现。将四个调用分支封装成包含四个独立事务的分支的大事务。基于 XA 分布式事务的方案可以满足业务处理结果的正确性。但最大的缺点是多分支环境下资源锁定范围大,并发度低,随着下游分支的增加,系统性能会越来越差。因此,考虑到性能问题,将上述基于 XA 事务的方案进行简化,将订单系统变更作为本地事务,剩下的系统变更作为普通消息的下游来执行,事务分支简化成普通消息和订单表事务,充分利用消息异步化的能力缩短链路,提高并发度。看上去很好,但实际上方案存在明显的缺陷。

消息下游分支和订单系统变更的主分支很容易出现不一致的现象。比如说消息发送成功了,订单没有执行成功,需要回滚整个事务。订单执行成功,消息没有发送成功,需要额外补偿才能发现不一致。消息发送超时未知,此时无法判断需要回滚订单还是提交订单。所以需要事务消息来解决性能和一致性的两大问题。基于普通消息方案中普通消息和订单事务无法保证一致的本质原因其实是由于普通消息无法像单机数据库事务一样具备提交、回滚和统一协调的能力。因此,RocketMQ 实现的分布式事务消息功能在普通消息的基础上支持了二阶段的提交能力,将二阶段提交和本地事务绑定。实现全局提交结果的一致性。

image.png

因此,RocketMQ 事务消息的方案具有很好的可拓展性,并且能够简化业务代码的开发,还保存着很好的性能。

 

二、基本原理


学习了为什么需要事务消息,接着开始了解一下事务消息的一些基本概念,这对之后更好地理解事务消息的实现有很大的帮助。第一个概念,事务消息:RocketMQ提供类似 XA OpenXA 的分布式事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致;第二个概念,半事务消息:暂不能投递的消息,生产者已经成功地将消息发送到了 RocketMQ 服务端,但是 RocketMQ 服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息;第三个概念,消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,使得 RocketMQ 服务端通过扫描发现某条消息长期处于“半事务消息时,需要主动向消息生产者询问该消息的最终状态( Commit 或是 Rollback ),该询问过程即消息回查。了解完三个基础的概念之后呢,继续来看一下事务消息的生命周期。

image.png

事务消息的生命周期分为以下几个部分,初始化状态,事务提交状态,提交待消费状态,消息回滚状态,之后是消费中消息提交和消息删除,其与普通的消息一致。这里就不再叙述了,然后来看一下初始化状态。半事务消息被生产者构建并完成初始化,待发送到服务端的状态。事务待提交:消息被发送到服务端,对下游不可见等待消费者获取处理的状态。半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见;

消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止;提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费;了解完消息的生命周期后,让来看看事务消息的处理流程。

image.png

首先,生产者发送半事务消息到 RocketMQ RocketMQ 服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功。收到,此时消息被标记为“暂不能投递”,这种状态下的消息即为半事务消息。接着,生产者开始执行本地的事务逻辑。生产者根据本地事务执行的结果,向服务端提交二次确认结果,Commit或者 Rollback。如果服务端收到了确认的结果为 Commit :服务端将半事务消息标记为可投递,并投递给消费者;如果二次确认的结果为 Rollback :服务端将回滚事务,不会将半事务消息投递给消费者。其次在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为 Unknown 状态,经过一定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查;生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果;生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。这就是整个事务消息的处理的流程。通过对事务消息的流程的了解,对事务的原理有了更深的认识。

 

三、具体实现


那直接来看一下事务消息的具体代码实现的细节。

image.png

这张图与上述的事务消息流程图相对应。代码主要涉及到SendMessageProcessor就是负责消息发送的处理。EndTransactionProcessor负责事务的提交或者回滚的处理。TransactionalMessageServiceImpl#check负责消息回查逻辑。还有两个事务消息的系统半事务 topic 队列和半事务 op 的队列在之后的详细解说里面使用到。

 

详细的说明一下,这里有三个流程和一个优化,首先讲一下三个流程,第一个是接受和处理半事务消息,half 消息就是半事务消息,第二个 commit roll back 命令处理。第三个就是事务消息的回查和事务消息的 check 逻辑。

首先来看一下第一个接收处理是半事务消息。发送方第一阶段会发送半事务消息到brope brope 处理半事务消息的主要具体的处理流程是先识别消息是事务消息,然后把消息转换成 topic MQCFSHalf topic 半事务消息队列,然后其余的内容保持不变,把它写到了半事务消息队列里面去。这就完成了第一步处理事务的半事务消息。

image.png

然后接着第二步是对 commit roll back 命令的处理,发送方完成了本地事务之后,需要继续向云云端发送二次确认是 commit 或者 roll back brope ,就是RocketMQ 的服务端,由于当前的事务已经完结,brope需要删除原有的半事务消息,由于 RocketMQ opend only 的特性。brope 需要通过 op 消息实现标记删除。Only 特性就是就是基于文件来实现的。commit 如果他收到结果是 commit 的话,brope 会写入 op 消息,op 消息的 body 里面是指 commit 的消息就是半事务的 office size,标记之前的半事务消息已经被删除了,同时 brope 会读取原来的半事务消息把 topic 还原成原来用户的实际的 topic ,重新写到 topic 里面就是写到对应的用户的 topic 的目标里面去,消费者就可以拉起来消费了。如果收到二次确认的结果是 roll back 的话,brope 同样也会写入 op 消息。流程和 commit 的是一样的,但是不会在读取还原半事务消息,这样消费者就不会消费到消息。来看一下事务消息的 check 逻辑。

image.png

当如果客户端发送端的事务消息执行过程中发送 UNKNOWN 命令或者 brope 在发送端发生重启发布等原因,可能会导致 commit roll back 的二次确认的结果丢失。这样,需要增加一个事务的 check 逻辑。事务的逻辑是异步执行的,现在默认的间隔是30秒时间会进行一次 check ,针对于这些可能的半事务消息进行回查。事务消息的回查的流程大致是这样子的,先扫描当前的 op 消息队列读取已经被标记删除的半事务消息的 process。如果发现某个半事务消息没有 op 消息对应标记,并且已经超时了,则会读取该半事务消息重新写到半事务消息队列里面去,并且发送 check 到原发送方面的检查事务。如果没有超时,则会等待后面读取op消息队列获取新的 op 消息。另外为了避免发送方的异常导致长期无法确认的事务的状态。如果半事务消息出生的时间超过最大保留时间,这个消息不再 check 来看一下对 op 消息的一个优化,

image.png

就是在二次确认中,一个 op 消息的实现只是对应了一个半事务消息,这样的话写入就很容易放大为了实现一个 op 消息,可以对应多个半事务消息,就是在 op 消息里写入了多个半事务消息的 cover size。批量的 op 消息实现类似于 TCP 传输的,算法使用了定时和指定大小两个参数来控制op消息的写入。首先会在内存中几率和op 消息对应的内容,如果 op 消息的 body 超过指定的大小,现在默认大小的话是就会写入,会消失,如果定时消息。如果超过定时的时间呢,现在批量写入定时的时间是三秒,就会把定时的不会把批量的op消息刷到文件里面去。效果具体到线上的优化效果,他是半事务消息和半事务消息的显著的 op 的最大比值可是100 : 1。这就是一个 op 消息,对应了100个半事务消息,当然如果是半事务消息越高比例越大。就是优化效果可能会更好。

image.png

讲完了事务消息的实现,还有两个问题需要注意一下,第一个问题是可能会可能会造成较大的磁盘压力,原因是事务消息 check 的时候,如果遇到无法确认的事务的半事务消息会回写到半事务消息的队列里面去。如果发送端异常,有大量的未确定的状态的半事务消息会持续不断的回写,这样会造成一个磁盘的浪费,但会有相应的优化方案,然后另外一个注意点就是事务消息的 check 逻辑,可能会导致消息的延迟,因为事务消息的 check 是异步单线程。并且所有的半事务消息都是共用一个公共的半事务消息队列。如果单个业务方异常了,发送方无法确认事务状态的半事务消息。这样会导致事务消息 check 的时候大量时间都在处理业务方的异常的半事务消息,从而会导致其他业务方的少量的半事务消息的 check 的间隔会变大。

 

四、使用方法


讲完了事务消息的两个注意点之后。来最后来看看如何使用 RocketMQ 的事务消息。

image.png

首先,在控制上创造一个 Topic 要保持消息类型与topic一致。就是现在是要实践事务消息。创建的 topic 是事务消息,如果是之前是定时消息,那创造定时消息类型的 topic 。创建完之后,看一下事务消息发送的实例,

image.png

第一步会构造一个事务消息的检查器,适用于那些异常事务的状态的检查,就是回查逻辑会返回本地的事务的结果。那第二步会绑定事务检查,需要事务检查的 topic 列表。第三步开启事务,然后执行本地事务,然后发送事务消息,提交本地事务,提交消息事务。就是完成了整个事务消息的发送。然后再看一下事务消息的消费

image.png

事务消息的消费是跟普通消息的消费是一样的,但是有一点需要注意的是,事务消息要与其他的类型的 topic 要区分开来。

使用约束消息类型一致性:事务消息仅支持在 MessageType Transaction 的主题内使用,发送的消息的类型必须和主题的类型一致。消息事务性:事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和下游事务的一致性。事务超时机制:半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。最佳实践避免大量未决事务导致超时:在事务提交阶段异常的情况下发起事务回查,保证事务一致性。但生产者应该尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能受损,容易导致事务处理延迟。事务消息的 Group ID 不能与其他类型消息的 Group ID 共用:与其他类型的消息不同,事务消息有回查机制,回查时服务端会根据GroupID 去查询生产者客户端。假如跟别人共用,可能会导致消息的一个混乱。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
2月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
4月前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程
|
4月前
|
消息中间件 网络性能优化 RocketMQ
消息队列 MQ产品使用合集之本地事务还没有执行完就触发了回查是什么导致的
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
5月前
|
消息中间件 RocketMQ
MQ与本地事务一致性问题---RocketMQ事务型消息
MQ与本地事务一致性问题---RocketMQ事务型消息
65 2
|
消息中间件 Kafka 数据库
【JavaP6大纲】分布式事务篇:MQ 事务消息
【JavaP6大纲】分布式事务篇:MQ 事务消息
110 0
|
消息中间件 存储 监控
七种常见分布式事务详解(2PC、3PC、TCC、Saga、本地事务表、MQ事务消息、最大努力通知)
七种常见分布式事务详解(2PC、3PC、TCC、Saga、本地事务表、MQ事务消息、最大努力通知)
1650 0
|
消息中间件 存储 缓存
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
|
消息中间件 RocketMQ 开发者
事务消息的实现|学习笔记
快速学习事务消息的实现
事务消息的实现|学习笔记
|
存储 Java 数据库
消息类型-事务消息|学习笔记
快速学习消息类型-事务消息
154 0
消息类型-事务消息|学习笔记