分布式事务消息
分布式事务的来龙去脉
业务场景:用户A转账100元给用户B,这个业务比较简单,具体的步骤:
1、用户A的账户先扣除100元
2、再把用户B的账户加100元
如果在同一个数据库中进行,事务可以保证这两步操作,要么同时成功,要么同时不成功。这样就保证了转账的数据一致性。
但是在微服务架构中因为各个服务都是独立的模块,都是远程调用,没法在同一个事务中,所以就会遇到分布式事务问题。
RocketMQ中的处理方案
RocketMQ分布式事务方式:把扣款业务和加钱业务异步化,扣款成功后,发送“扣款成功消息”到RocketMQ,加钱业务订阅“扣款成功消息”,再对用户B加钱(扣款消息中包含了源账户和目标账户ID,以及钱数)
对于扣款业务来说,需规定是先扣款还是先向MQ发消息
场景一:先扣款后向MQ发消息
先扣款再发送消息,万一发送消息失败了,那用户B就没法加钱
场景二:先向MQ发像消息,后扣款
扣款成功消息发送成功,但用户A扣款失败,可加钱业务订阅到了消息,用户B加了钱 问题所在,也就是没法保证扣款和发送消息,同时成功,或同时失败;导致数据不一致。 为了解决以上问题,RocketMq把消息分为两个阶段:半事务阶段和确认阶段
半事务阶段:
该阶段主要发一个消息到rocketmq,但该消息只储存在commitlog中,但consumeQueue中不可见,也就是消费端(订阅端)无法看到此消息
确认阶段(commit/rollback):
该阶段主要是把半事务消息保存到consumeQueue中,即让消费端可以看到此消息,也就是可以消费此消息。如果是rollback就不保存。
整个流程:
- A在扣款之前,先发送半事务消息
- 发送预备消息成功后,执行本地扣款事务
- 扣款成功后,再发送确认消息
- B消息端(加钱业务)可以看到确认消息,消费此消息,进行加钱
注意:上面的确认消息可以为commit消息,可以被订阅者消费;也可以是Rollback消息,即执行本地扣款事务失败后,提交rollback消息,即删除那个半事务消息,订阅者无法消费。这样就可以解决以下异常问题:
- 异常1:如果发送半事务消息失败,下面的流程不会走下去,这个是正常的。
- 异常2:如果发送半事务消息成功,但执行本地事务失败。这个也没有问题,因为此半事务消息不会被消费端订阅到,消费端不会执行业务。
- 异常3:如果发送半事务消息成功,执行本地事务成功,但发送确认消息失败;这个就有问题了,因为用户A扣款成功了,但加钱业务没有订阅到确认消息,无法加钱。这里出现了数据不一致。
RocketMq如何解决上面的问题,核心思路就是【事务回查】,也就是RocketMq会定时遍历commitlog中的半事务消息。对于异常3,发送半事务消息成功,本地扣款事务成功,但发送确认消息失败;因为RocketMq会进行回查半事务消息,在回查后发现业务已经扣款成功了,就补发“发送commit确认消息”;这样加钱业务就可以订阅此消息了。
这个思路其实把异常2也解决了,如果本地事务没有执行成功,RocketMQ回查业务,发现没有执行成功,就会发送RollBack确认消息,把消息进行删除。同时还要注意的点是,RocketMQ不能保障消息的重复,所以在消费端一定要做幂等性处理。除此之外,如果消费端发生消费失败,同时也需要做重试,如果重试多次,消息会进入死信队列,这个时候也需要进行特殊的处理。(一般就是把A已经处理完的业务进行回退)
如果本地事务执行了很多张表,那是不是我们要把那些表都要进行判断是否执行成功呢?这样是不是太麻烦了,而且和业务很耦合。好的方案是设计一张Transaction表,将业务表和Transaction绑定在同一个本地事务中,如果扣款本地事务成功时,Transaction中应当已经记录该TransactionId的状态为「已完成」。当RocketMq事务回查时,只需要检查对应的TransactionId的状态是否是「已完成」就好,而不用关心具体的业务数据。
如果是银行业务,对数据要求性极高,一般A与B需要进行手动对账,手动补偿。
分布式事务使用案例
本案例简化整体流程,使用A系统向B系统转100块钱为例进行讲解。案例中消息发送方是A系统,消费订阅方是B系统。
生产者案例代码:
代码解释如下:
- 启动线程池进行定时的任务回查
2. 设置生产者事务回查监听器
- 开启本地事务,同时发送半事务消息
- 如果半事务消息失败,则整个事务回滚。
事务回查案例代码:
事务回查是非常有必要的,因为生产者在发送完半事务消息后不能立马确认事务的执行状态,所以事务回查有两个方法,一个是本地事务方法,一个是事务回查方法,目的都是为了确保事务的提交或者回滚。
消费者案例代码:
消费者使用的尽最大可能性确保成功消费(重试机制+死信队列特殊处理),所以B系统的处理比较简单,开启事务确保消费成功即可。
使用限制
- 事务消息不支持延时消息和批量消息。
- 事务回查的间隔时间:BrokerConfig. transactionCheckInterval 通过Broker的配置文件设置好。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。、
- 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
- 事务性消息可能不止一次被检查或消费。
- 事务性消息中用到了生产者群组,这种就是一种高可用机制,用来确保事务消息的可靠性。
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
8. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。