多类型业务消息专题-事务消息 | 学习笔记

简介: 快速学习多类型业务消息专题-事务消息

开发者学堂课程【 RocketMQ 消息集成:多类型业务消息专题多类型业务消息专题-事务消息】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址https://developer.aliyun.com/learning/course/1161/detail/17324


多类型业务消息专题-事务消息

 

内容介绍:

一、背景介绍:为什么需要事务消息

二、基本原理:事务消息的概念和基本流程

三、具体实现:事务消息是如何实现的

四、使用方法:如何使用事务消息

 

这期分享 RocketMQ 的事务消息。将从以下四个方面来展开。

1.  背景介绍:为什么需要事务消息

2.  基本原理:事务消息概念和基本流程

3.  具体实现:事务消息是如何实现的

4.  使用方法:如何使用事务消息

首先,分析一下为什么需要事务消息,接着对事务消息的概念和基本流程进行了解,然后来一起看一下 RocketMQ 的事务消息到底是如何实现的。最后,体验一下如何使用 RocketMQ 的事务消息。

 

一、   背景介绍::为什么需要事务消息


image.png


首先想一下为什么需要事务消息。以电商交易场景为例,用户支付订单这一核心操作会涉及到下游物流发货、积分变更、购物车状态、轻工等多个子系统的变更,因此对分布式事务有强烈的诉求。分布式系统调用的特点是一个核心业务逻辑的执行,需要同时调用多个下游业务进行处理。如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题。典型的方案是基于 XA 协议的分布式事务系统来实现。将四个调用分支分装成包含四个独立事务的分支的大事务。基于差分布式事务的方案可以满足业务处理的正确性。但是最大的缺点是多分支环境下的资源锁定范围大,并发度的,随着下游分支的增加,系统的性能会变得越来越差,因此,考虑到性能问题,基于上述的差数的方案进行简化。


image.png


将订单系统变更作为本地事务,剩下的系统变更作为普通消息的下游来执行,事务分支简化成普通消息和订单表事务,充分利用消息异步化的能力,缩短链路,提高并发度,看上去很好。但实际上方案存在明显的缺陷消息,下游分支和订单系统变更的主分支很容易出现不一致的现象。比如消息发送成功了,订单没有执行成功,需要回滚,整个事务订单执行成功,消息没有发送成功,需要额外补偿才能发现不一致。消息发送超时,未知,此时无法判断需要回滚订单还是提交订单,所以需要事务消息来解决性能和一致性的两大问题。至于普通消息方案中普通消息和订单事务无法保证一致的本质原因,其实是由于普通消息无法像单机数据库事务一样具备提交、回滚和统一协调的能力,因此 RocketMQ 实现的分布式事务消息功能,在普通消息的基础上支持了二阶段的提交能力,强二阶段提交和本地事务绑定,实现全局提交结果的一致性。因此 RocketMQ 具有很好的可扩展性,并且能够简化业务代码的开发,还保存着很好的性能。


image.png


二、基本原理:事务消息的概念和基本流程


开始了解一下事务消息的一些基本概念,这对之后更好地理解事务消息的实现有很大的帮助。第一个概念,事务消息。RocketMQ 提供的类似于 openx1 的分布式事务功能,通过两个事务消息能够达到分布式事务的最终一致性。第二,概念办事务消息暂不能同意的消息,生产者已经成功将消息发送到 MQ 服务端,但是RocketMQ 服务端未收到,未收到生产者对消息的二次确认。此时,该消息被标记成暂不能逃避的状态,处于该状态下的消息极为办事处消息,第三个概念消息回查,由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失。使得 RocketMQ 服务端通过扫描某条消息长期属于半数消息时,会主动的向消息的生产者询问该消息的最终状态,到底是 commit 或者 roll back 。该询问的过程即为消息回查。

了解三个基础的概念之后,继续来看一下事务消息的生命周期:


image.png


生命周期分为以下几个部分,初始化状态,事务提交、状态提交,待消费状态消息。回滚状态之后是消费中消息提交和消息删除,其与普通的消息一致的,这里就不再叙述了。初始化状态:事务所办事务消息被生产者构建并完成初始化带发送到服务端的状态。事务待提交状态:消息被传输到服务端对下游不可见,等待消费者获取处理的状态。办事处消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储的系统中,等待第二阶段本地事务返回执行结果后再提交此时消息对下游消费者不可见。消息回滚状态:如果是第二阶段,如果执行的事务的结果明确为回滚,服务端会将是办事务消息回滚该事务消息流程,终止。提交待消费状态:消费第二阶段,如果执行的结果明确为提交,服务端会将办事处消息重新存储到普通存储系统中,此时消息对下游的消费者可见。等待被消费的者消费。

了解完消息的生命周期后,看看事务消息的处理流程。


image.png


首先,生产者发送办事处消息到 RocketMQ 的服务端,RocketMQ 服务端会将消息持久化,成功之后会返回生产者确认消息,已经成功。收到此时,消息被标记为暂不能投递。接着生产者开始执行本地的事务逻辑。生产者根据本地事务执行的结果向服务端提交二次确认结果,Commit 或者收 Rollback 。如果服务端收到了确认的结果。为 commit 的服务端会将办事处消息标记为可投递并投递给消费者。如果二次确认的结果为 Rollback ,服务端将回滚事务,不会将办事务消息投递给消费者。其次在断网或者生产者应用重启的特殊情况下。如果服务端没有收到发送者提交的二次确认的结果,结果为 no 状态,经过一定时间后,服务端根据将会对消费叫消息的生产者发起消息回滚。生产者收到消息回答后,需要检查对应的消息的,本地事务执行的最终结果,甚至在根据检查到的本地事务的。最终状态再次提交,二次确认,服务端仍然按照步骤四,对半事务消息进行处理。就是整个事务消息的处理的流程。通过对事务消息的流程的了解,对事务的原理,应该有了更深的认识。

 

三、具体实现:事务消息是如何实现的


看一下事务消息的具体代码实现的细节:

image.png

这张图与上述的事务消息流程图相对应。代码主要涉及到 SendMassageProcess ,负责消息发送的处理。End process and decision process ,负责事务的提交或者回滚的处理。Transition message service 里面 check 负责消息回查逻辑,还有两个事务消息的系统, Topic 队列,办事 Target topic 队列,半事务 op 的队列。之后详细讲解说,里面都会有停机的或者使用到。详细说明一下,这里有三个流程和一个优化。首先讲一下三个流程,第一个是接收和处理办事务消息,Half消息就是半数据消息,第二个,Commit rollback 命令处理,第三个就是事务消息的回查,旧事务消息的缺乏逻辑。


image.png

首先看一下第一个接收处理办事务消息half消息发送方的阶段,会发送半事务消息到 brokebroke 处理半事务消息,它的主要具体的处理流程,首先先识别消息是事务消息,然后把消息转换成 TopicA Tag Property Body ,半事处消息队列,topic其余的内容保持不变,把它写到了半事处消息队列里面去。这就完成了第一步处理事务的半事务消息。


image.png


第二步,是对 commit rollback 命令的处理。发送方完成了本地事务之后,需要继续向云端发送二次确认是 commit 或者 rollback RocketMQ 服务端。由于当前的事务已经完结,Broker 需要删除原有的半事务消息,由于 RocketMQ appendonly 的特性。broke 需要通过 op 消息实现,标记删除 open only 特性,基于文件来实现的。如果收到的结果是 commitBroke 会写入 op 消息,op 消息的 body 里面是指定的 commit 消息,标记之前的半事务消息已经被删除了,同时会读取原来的半事务消息,topic 还原成原来用户的实际 topic ,重新写到 commit里面,就是写到对应的用户的 topic 目标,消费者就可以拉起来消费了。如果收到二次确认的结果是 rollbackBrook 同样也会写入 op 消息流程,和 commit 是一样的,但是不会再读取还原半事务消息,这样消费者就不会消费到这消息了。

大致学习完了 commit rollback 的处理流程之后,看下事务消息 Check 逻辑:


image.png

如果客户端发送端的事务消息执行过程中发生了在发送端发生重启,可能会导致commit rollback 的二次确认的结果,会使删除标记的消息,可能也会丢失,需要增加一个事务的缺乏逻辑,事务的缺乏逻辑是异步执行的,现在默认的间隔是30秒,时间会进行一次,Check 可针对于这些可能的就是没有得 op 消息,半事务消息进行回查。事务消息的回查的流程大致是这样,先会扫描当前的消息队列,读取已经被标记删除的内容,半事务消息的 QSYS ,如果发现某个半事务消息没有 op消息对应标记,并且已经超时了,则会读取该半事务消息,重新写到半事务消息队列里面去,并且发送 Check 命令道原发送方检查事故状态,如果没有超时,则会等待后面读取 op 消息队列获取新的消息另外,为了避免发送方的异常导致长期无法确认的事务的状态,如果半事务消息的出生时间Time超过最大的保留时间会自动跳过这条消息,不在进行 Chec k

image.png


学习完了三个流程之后,看一下 Batch op 消息的一个优化,在 commitrollback二次确认中,一个 op 消息的实现,只是对应了一个半事务消息,这样写入就很容易放大,为了实现一个 APP 消息可以对应多个半事务消息,就是在APP消息里写入了多个半事务消息的 cover size 。批量的 op 消息实现类似于 TCP 传输的 necklace 算法,使用了定时和指定大小两个参数来控制, op 消息的写入,首先会在内存中聚合 APP 消息对应的内容,如果 op 消息的 body 超过指定的大小,如果定时消息如果超过定时时间批量写入,定时的时间是三秒就不会把批量 APP 消息下刷到文件里面。效果具体上到线上的优化效果,半事务消息和半事务效率的 APP的写入的最大比值可是100 : 1,就是一个APP消息对应了100个半事务消息。当然如果是半事务消息的 TCL 越高,比例肯定会越大。优化效果可能会更好。

还有两个问题需要注意一下,第一个问题是可能会造成较大的磁盘压力。原因是事务消息 check 的时候,如果遇到无法确认半事务消息,半事务消息队列里面去,如果发送方异常有大量的未确定的状态半事务消息会持续不断的回血,这样会造成一个磁盘的浪费,会有相应的优化方案,这里就不详细展开了。然后另外一个注意点是事务消息,这逻辑的话 Check 可能会导致消息的延迟,因为事务消息的check是单线程的,并且所有的办事效益都是共用一个公共的办事效率队列,如果单个业务到异常了,发送方无法确认事务状态的半事务消息,这样会导致半事务消息的Check 和半事务效率的时候,大量时间都处。在处理业务方的异常的半事务消息,从而会导致其他业务方的少量的半事务消息的 check 的间隔会变大。

 

四、  使用方法:如何使用事务消息


首先,创建一个事务 Topic


image.png


要保持消息类型与 topic 一致,就是现在是要实践事务消息,创新的 topic 事务消息,如果之前做定时消息,创造定时消息类型的 topic 。

创建完之后,看一下事务消息发送到实例,第一步会构造一个事务消息的检察机,适用于那些异常事务的状态的检查,回答逻辑会返回本地的事务的一个结果一个回调,第二步,会绑定事务检查需要事务检查的 topic talk 列表。第三步,开启事务然后执行本地事务,然后发送事务消息,然后提交本地事务,消息事务,就是完成了整个消息的事务消息的发送。然后再看一下事务消息的消费,是不是消费,等到事务消息消费,跟普通消息的消费是一样的,但是有一点需要注意的是,剩下的consumer group 要与其他类型的 topic 要区分开。

//事务消息发送实例

//1.构造事务检查器

Transactionchecker checker new Transactionchecker(){

@Override

Public      TransactionResolution       check(Messageview messageview){

//异常状态的事务检查,返回本地事务结果。

return TransactionResolution.COMMIT;

}

};

ProducerBuilder producerBuilder null;

//2,绑定事务检查器以及需要事务检查的主题列表。

Producer producerl1=producerBuilder.setTransactionChecker(checker)

.setTopics("TransactionTopic").build();

//3.开启事务。

Transaction transaction = produr1.beginTransaction();

//3.1执行本地事务。

//.…

//3.2发送事务消息。

Message messagel messageBuilder.build();

SendReceipt       sendReceipt1 producer1.send(messagel,transaction);

//3.3提交本地事务。

//.…

/3.4提交消息事务。

transaction.commit();

public static void main(String[]args){ClientServiceProvider

provider ClientServiceProvider.loadService();

StaticSessionCredentialsProvider staticSessionCredentialsProvider new

StaticSessioncredentialsProvider("AccessKey","SecretKey");

ClientConfiguration

clientConfiguration clientConfiguration.newBuilder()

l  setEndpoints("AccessPoint")

l  setCredentialProvider(staticSessionCredentialsProvider)

l  build();

try(

PushConsumerignored=provider.newPushConsumerBuilder()

l  setclientConfiguration(clientConfiguration)

l  setConsumerGroup("ConsumerGroup")

l  setSubscriptionExpressions(Collections.singletonMap("TransactionTopic",new FilterExpression()))

l  setMessageListener(messageview ->{

LOGGER.info("Received message,message=()",messageview);

return ConsumeResult.SUCCESS;

})

l  build()){

}catch (ClientException e){

//捕获异常,进行异常处理

}

}

 

使用约束

ü  消息类型一致性:事务消息仅支持在 MessageType

Transaction 的主题内使用,发送的消息的类型必须和主题的类型一致。

ü  消息事务性:事务消息保证本地主分支事务和下游消息发送事务

的一致性,但不保证消息消费结果和下游事务的一致性。

ü  事务超时机制:半事务消息被生产者发送服务端后,如果在指定

时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。

 

最佳实践

ü  避免大量未决事务导致超时:在事务提交阶段异常的情况下发起

事务回查,保证事务一致性。但生产者应该尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能纸领,容易导致事务处理延迟。

ü  事务消息的 Group ID 不能与其他类型消息的 Group ID 共用:与其他类型的消息不同,事务消息有回查机制,回查时服务端会根据 Group ID 去查询生产者客户端。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 Kafka 测试技术
MQ 学习日志(七) 保证消息消费的顺序性
保证消息消费的顺序性
195 0
|
消息中间件 存储 NoSQL
该如何保证消息不被重复消费
该如何保证消息不被重复消费
216 0
|
消息中间件 存储 算法
RocketMQ 消息集成:多类型业务消息——定时消息
本篇将继续业务消息集成的场景,从使用场景、应用案例、功能原理以及最佳实践等角度介绍 RocketMQ 的定时消息功能。
475 0
RocketMQ  消息集成:多类型业务消息——定时消息
|
消息中间件 存储 监控
MQ的作用及如何解决消息队列的丢失、重复和积压问题
引入 MQ 消息中间件最直接的目的是:做系统解耦合流量控制,追其根源还是为了解决互联网系统的高可用和高性能问题。 系统解耦:用 MQ 消息队列,可以隔离系统上下游环境变化带来的不稳定因素,比如京豆服务的系统需求无论如何变化,交易服务不用做任何改变,即使当京豆服务出现故障,主交易流程也可以将京豆服务降级,实现交易服务和京豆服务的解耦,做到了系统的高可用。
210 0
|
消息中间件 存储 缓存
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
|
消息中间件 存储 数据库
解析 RocketMQ 业务消息——“事务消息”
本篇文章通过拆解 RocketMQ 事务消息的使用场景、基本原理、实现细节和实战使用,帮助大家更好的理解和使用 RocketMQ 的事务消息。
解析 RocketMQ 业务消息——“事务消息”
|
存储 Java 数据库
消息类型-事务消息|学习笔记
快速学习消息类型-事务消息
166 0
消息类型-事务消息|学习笔记
|
消息中间件 运维 监控
多类型业务消息专题-普通消息 | 学习笔记(一)
快速学习多类型业务消息专题-普通消息
170 0
 多类型业务消息专题-普通消息 | 学习笔记(一)
|
消息中间件 存储 运维
多类型业务消息专题-普通消息 | 学习笔记(二)
快速学习多类型业务消息专题-普通消息
135 0
多类型业务消息专题-普通消息 | 学习笔记(二)
|
存储 消息中间件 Linux
多类型业务消息专题-顺序消息 | 学习笔记
快速学习多类型业务消息专题-顺序消息
多类型业务消息专题-顺序消息 | 学习笔记