RocketMq- 分布式事务消息

简介: RocketMq- 分布式事务消息

分布式事务消息


分布式事务的来龙去脉

image.png

业务场景:用户A转账100元给用户B,这个业务比较简单,具体的步骤:


1、用户A的账户先扣除100元


2、再把用户B的账户加100元

如果在同一个数据库中进行,事务可以保证这两步操作,要么同时成功,要么同时不成功。这样就保证了转账的数据一致性。


但是在微服务架构中因为各个服务都是独立的模块,都是远程调用,没法在同一个事务中,所以就会遇到分布式事务问题。


RocketMQ中的处理方案


image.png

RocketMQ分布式事务方式:把扣款业务和加钱业务异步化,扣款成功后,发送“扣款成功消息”到RocketMQ,加钱业务订阅“扣款成功消息”,再对用户B加钱(扣款消息中包含了源账户和目标账户ID,以及钱数)


对于扣款业务来说,需规定是先扣款还是先向MQ发消息



场景一:先扣款后向MQ发消息

先扣款再发送消息,万一发送消息失败了,那用户B就没法加钱


场景二:先向MQ发像消息,后扣款

扣款成功消息发送成功,但用户A扣款失败,可加钱业务订阅到了消息,用户B加了钱 问题所在,也就是没法保证扣款和发送消息,同时成功,或同时失败;导致数据不一致。 为了解决以上问题,RocketMq把消息分为两个阶段:半事务阶段和确认阶段


半事务阶段:

该阶段主要发一个消息到rocketmq,但该消息只储存在commitlog中,但consumeQueue中不可见,也就是消费端(订阅端)无法看到此消息


确认阶段(commit/rollback):

该阶段主要是把半事务消息保存到consumeQueue中,即让消费端可以看到此消息,也就是可以消费此消息。如果是rollback就不保存。


image.png

整个流程:


  1. A在扣款之前,先发送半事务消息


  1. 发送预备消息成功后,执行本地扣款事务


  1. 扣款成功后,再发送确认消息


  1. B消息端(加钱业务)可以看到确认消息,消费此消息,进行加钱


注意:上面的确认消息可以为commit消息,可以被订阅者消费;也可以是Rollback消息,即执行本地扣款事务失败后,提交rollback消息,即删除那个半事务消息,订阅者无法消费。这样就可以解决以下异常问题:


  1. 异常1:如果发送半事务消息失败,下面的流程不会走下去,这个是正常的。


  1. 异常2:如果发送半事务消息成功,但执行本地事务失败。这个也没有问题,因为此半事务消息不会被消费端订阅到,消费端不会执行业务。


  1. 异常3:如果发送半事务消息成功,执行本地事务成功,但发送确认消息失败;这个就有问题了,因为用户A扣款成功了,但加钱业务没有订阅到确认消息,无法加钱。这里出现了数据不一致。


image.png

RocketMq如何解决上面的问题,核心思路就是【事务回查】,也就是RocketMq会定时遍历commitlog中的半事务消息。对于异常3,发送半事务消息成功,本地扣款事务成功,但发送确认消息失败;因为RocketMq会进行回查半事务消息,在回查后发现业务已经扣款成功了,就补发“发送commit确认消息”;这样加钱业务就可以订阅此消息了。


这个思路其实把异常2也解决了,如果本地事务没有执行成功,RocketMQ回查业务,发现没有执行成功,就会发送RollBack确认消息,把消息进行删除。同时还要注意的点是,RocketMQ不能保障消息的重复,所以在消费端一定要做幂等性处理。除此之外,如果消费端发生消费失败,同时也需要做重试,如果重试多次,消息会进入死信队列,这个时候也需要进行特殊的处理。(一般就是把A已经处理完的业务进行回退)


image.png


如果本地事务执行了很多张表,那是不是我们要把那些表都要进行判断是否执行成功呢?这样是不是太麻烦了,而且和业务很耦合。好的方案是设计一张Transaction表,将业务表和Transaction绑定在同一个本地事务中,如果扣款本地事务成功时,Transaction中应当已经记录该TransactionId的状态为「已完成」。当RocketMq事务回查时,只需要检查对应的TransactionId的状态是否是「已完成」就好,而不用关心具体的业务数据。


如果是银行业务,对数据要求性极高,一般A与B需要进行手动对账,手动补偿。



分布式事务使用案例


本案例简化整体流程,使用A系统向B系统转100块钱为例进行讲解。案例中消息发送方是A系统,消费订阅方是B系统。


image.png


生产者案例代码:

image.png

代码解释如下:


  1. 启动线程池进行定时的任务回查


image.png

2. 设置生产者事务回查监听器

image.png


  1. 开启本地事务,同时发送半事务消息

image.png


image.png


  1. 如果半事务消息失败,则整个事务回滚。




事务回查案例代码:


事务回查是非常有必要的,因为生产者在发送完半事务消息后不能立马确认事务的执行状态,所以事务回查有两个方法,一个是本地事务方法,一个是事务回查方法,目的都是为了确保事务的提交或者回滚。

image.png


image.png

消费者案例代码:


消费者使用的尽最大可能性确保成功消费(重试机制+死信队列特殊处理),所以B系统的处理比较简单,开启事务确保消费成功即可。

image.png




使用限制


  1. 事务消息不支持延时消息和批量消息。


  1. 事务回查的间隔时间:BrokerConfig. transactionCheckInterval  通过Broker的配置文件设置好。


  1. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。、


  1. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。


  1. 事务性消息可能不止一次被检查或消费。


  1. 事务性消息中用到了生产者群组,这种就是一种高可用机制,用来确保事务消息的可靠性。


  1. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。


8. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。



相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
10天前
|
消息中间件 网络协议 C#
C#使用Socket实现分布式事件总线,不依赖第三方MQ
`CodeWF.EventBus.Socket` 是一个轻量级的、基于Socket的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。
C#使用Socket实现分布式事件总线,不依赖第三方MQ
|
16天前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
29天前
|
消息中间件 监控 供应链
深度剖析 RocketMQ 事务消息!
本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。
61 2
|
6月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
3月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
77 2
|
3月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
3月前
|
消息中间件 监控 RocketMQ
分布式事务实现方案:一文详解RocketMQ事务消息
分布式事务实现方案:一文详解RocketMQ事务消息
|
3月前
|
消息中间件 监控 安全
大事务+MQ普通消息线上问题排查过程技术分享
【8月更文挑战第23天】在复杂的企业级系统中,大事务与消息队列(MQ)的结合使用是一种常见的架构设计,用于解耦系统、提升系统响应性和扩展性。然而,这种设计也带来了其特有的挑战,特别是在处理退款业务等涉及金融交易的高敏感场景时。本文将围绕“大事务+MQ普通消息线上问题排查过程”这一主题,分享一次实际工作中的技术排查经验,旨在为大家提供可借鉴的解决思路和方法。
54 0
|
4月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
290 1
|
4月前
|
消息中间件 调度 RocketMQ
【RocketMQ系列六】RocketMQ事务消息
【RocketMQ系列六】RocketMQ事务消息
861 1