消息队列进阶-2.分布式事务解决思路汇总

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 消息队列进阶-2.分布式事务解决思路汇总

分布式事务


分布式事务产生的原因


在实际的开发当中,分布式事务产生的原因主要是来源于存储、服务的拆分。


存储层拆分


存储层拆分,最典型的就是数据库分库分表。(一般单表数据达到千万级就要进行拆分)


服务层拆分


服务层拆分也就是业务的服务化,系统架构的演进是从集中式到分布式,业务功能之间越来越解耦。

每个微服务都只能连接自己的库,不能进行跨库访问。


分布式事务解决方案


两阶段提交


非常经典的强一致性、中心化的原子提交协议


存在一个节点作为协调者,其它节点作为参与者,且协调者和参与者节点可以正常的网络通信。


所有的节点都是采用预写式日志,日志被写入后被保存在可靠的存储设备商,即使节点损坏也不会导致日志数据的丢失。


所有节点都不会永久性的损坏,即使损坏后仍然是可以恢复的


两个节点:commit-request,commit阶段

两阶段提交也会存在一定的问题:


1.我们在执行的过程中,所有的参与者都处于事务独占的状态,当我们参与了占有独占资源的时候,那么只能被阻塞。


2.协调者会存在单点故障,因为没有特定的超时机制,所以就会导致参与者会一直阻塞下去。


3.commit可能出现数据不一致,比如最后commit阶段a收到了,b没有收到,会一直处于阻塞状态


三阶段提交


三阶段提交协议在协调者和参与者中都引入了超时机制,并且把两阶段提交协议的第一个阶段分成了两步:询问,然后再锁资源,最后真正提交,增加了一个阶段的机会来处理异常情况,提交了系统的容错性。


TCC事务模型


TCC提出一种新的事务模型,基于 业务层面的事务定义 ,锁粒度完全由业务自己控制,目的是解决复杂业务中,跨表跨库等大颗粒度资源锁定问题。


TCC把事务运行工程分为了Try(一般是做一些预留资源或者锁定资源的动作) ,如果每一个分支都可以锁定成功,那么就可以执行下一步的confirm,如果有失败的呢,就要执行cancel。每个阶段的逻辑由业务代码控制,避免了长事务,可以获取更高的性能。


这个主要是基于业务层面的事务控制,每个事务分支(订单服务、库存服务)都需要自己去实现对应的try。confirm/cncel 接口,侵入比较强。

TM:Transaction Manager 事务管理器,负责整个TCC事务的协调控制。


TC:Transaction Coordinator,事务协调者


  • Try阶段:调用try接口,尝试执行业务,完成所有的业务检查、预留业务资源。


订单服务:添加一个预备状态,修改为Updating,冻结当期订单的操作,而不是直接修改为支付成功。


库存服务:冻结库存,扩展字段,可以添加新的库存冻结表。


  • Comfirm/Cancel 阶段:两个是互斥的,只能执行其中的一个,都需要幂等性,要允许失败重试。


Confirm:把前面的try阶段锁定的资源提交,类比数据库的Commit操作。在支付场景中,包括订单状态从更新中更新为支付成功。库存数据扣减在try冻结的库存。


Cancel:业务上的回滚操作。订单服务,撤销预备状态,还原为待支付状态或者取消状态,库存服务删除冻结的库存,添加到可用的库存中。


本地消息表方案


本地消息表的方案最初是由ebay的工程师提出,核心思想是将分布式事务拆分成本地事务来解决,通过消息日志的方式来异步执行。


本地消息表方法:借助于MQ,在本地数据库中新建一个消息表。


本地消息表是一种业务耦合的设计,消息生产方需要额外建一个事务消息表,并记录消息发送状态,消息消费方需要处理这个消息,并完成自己的业务逻辑,另外会有一个异步机制来定期扫描未完成的消息,确保最终一致性。

优点:实现逻辑比较简单,开发成本比较低


缺点:与业务场景绑定,高耦合。本地消息表与业务表在一个库中,占用业务系统资源,影响数据库性能。


其弊端在于它的伸缩性是受限于单个数据库性能以及容量的,因为本地消费表存储在单个数据库中,数据库的性能和容量其实就限制了我的扩展的能力。如果消费量增加,或者我的系统负载增加,可能需要升级数据库的硬件或者说进行扩展。这种扩展方式其实有一定的物理限制的。


事务消息方案


事务消息需要消息队列提供相应的功能才能实现,Kafka和RocketMQ都提供了事务相关功能。


分布式事务中的补偿操作


在分布式系统中,由于各个服务节点的执行可能存在时延、失败等情况,会导致分布式事务的各个参与者节点的状态不一致,从而使得分布式事务无法完成。为了解决这个问题,我们需要引入一种机制来进行补偿。


补偿机制指的是,在分布式事务出现异常时,通过一系列的操作,尽可能使得分布式事务状态回滚到之前的状态,从而避免分布式事务产生不一致的情况。具体来说,补偿机制通常包括两个方面的内容:


1.补偿操作:在发现分布式事务出现异常时,执行一系列的操作,使得分布式事务状态回滚到之前的状态,这个过程就称为补偿操作。补偿操作通常包括撤销之前的一些操作,例如将之前已经提交的事务进行回滚。


2.补偿流程:对于分布式事务中的每一个参与者节点,需要设计一个相应的补偿流程,用来保证在出现异常时,能够及时执行补偿操作。补偿流程通常包括检测异常情况、触发补偿操作等步骤。


补偿机制是实现分布式事务的重要手段之一。


分布式事务中的幂等性


在分布式事务中,幂等性指的是对于同一个操作,不论执行多少次,结果都是相同的。即对于一次操作,无论重复执行多少次,都只会产生一次结果。


幂等性问题的产生


  • 网络传输问题

由于网络传输的不确定性,可能会导致消息在传输过程中丢失、重复、乱序等问题,这个时候很多系统都会重试,从而引起幂等性问题。


  • 重试机制问题

在网络传输中,由于消息可能会丢失或者无法及时响应,系统可能会采取重试机制,重新发送消息。如果在消息处理过程中没有处理好幂等性问题,可能会导致重复执行操作。


  • 数据库操作问题

在数据库操作中,由于数据的并发访问,可能会导致数据出现重复插入或者更新的情况,从而引起幂等性问题。


  • 多个请求同时到达

在高并发场景中,多个请求同时到达,可能会导致重复执行操作的情况出现。


幂等性问题的影响


  • 数据重复:由于重复执行了操作,可能会导致数据的重复插入、更新等问题,从而影响数据的正确性和一致性。
  • 数据丢失:如果在执行操作时,未正确处理幂等性问题,可能会导致数据丢失。例如,在进行数据更新时,未对更新操作进行幂等性处理,可能会导致部分数据更新失败,从而导致数据丢失。
  • 系统不稳定:由于幂等性问题可能会导致重复执行操作,可能会导致系统资源的浪费,从而影响系统的稳定性。
  • 用户体验差:在幂等性问题出现时,可能会导致用户体验差,例如重复提交表单等问题。


如何解决幂等性问题


  • 唯一性约束:通过对操作的唯一性进行限制,保证统一操作只会执行一次。
  • 版本号:在每次执行操作时,对数据进行版本控制,确保只有最新的版本才会被执行。
  • 操作日志:通过记录操作的执行情况,避免重复执行相同的操作。


RocketMQ实现分布式事务


应用场景

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到小尤物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:


  • 主分支订单系统状态更新:由未支付更为支付成功
  • 物流系统状态新增:新增待发货物流记录,创建订单物流记录
  • 积分系统状态变更:变更用户积分,更新用户积分表
  • 购物车系统状态变更:清空购物车,更新用户购物车记录


基于Apache RocketMQ分布式事务消息:支持最终一致性


基于Apache RocketMQ实现的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。


功能原理


事务消息是Apache RocketMQ提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。


  • Half Message,半消息(半事务消息)


暂时不能被Consumer消费的消息。Producer已经把消息发送到Broker端,但是此消息的状态被标记为不能投递,处于这种状态下的消息成为半消息。事实上,该状态下的消息会被放在一个叫做RMQ_SYS_TRANS_HALF_TOPIC 的主题下。


当Producer端对它进行二次确认后,也就是Commit之后,Consumer端才可以消费到;那么如果是Rollback,该消息则会被删除,永远不会被消费到。


  • 事务状态回查


可能会因为网络原因、应用问题等,导致Producer端一直没有对这个半消息进行确认,那么这个时候Broker服务器会定时扫描这些半消息,主动找Producer端查询该消息的状态。


简而言之,RocketMQ事务消息的实现原理就是基于两阶段提交和事务状态回查,来决定消息最终是提交还是回滚的。


事务消息处理流程


事务消息交互流程如下图所示:

1.生产者将消息发送至Apache RocketMQ服务端,发送这个半消息对于订阅者来说是不可见、不可消费的,必须等到步骤④才可以被消费。


2.Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。


3.生产者开始执行本地事务逻辑,修改订单的状态为已支付。


4.生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:


二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。


二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。


5.在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果(步骤④),或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查,回查逻辑需要自己实现,检查订单状态是否已经修改为已支付。


6.生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。


7.生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。


事务消息的生命周期


1.初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。


2.事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。


3.消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。


4.提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。


5.消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。


6.消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。


7.消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

    //演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。
    private static boolean checkOrderById(String orderId) {
        return true;
    }
    //演示demo,模拟本地事务的执行结果。
    private static boolean doLocalTransaction() {
        return true;
    }
    public static void main(String[] args) throws ClientException {
        ClientServiceProvider provider = new ClientServiceProvider();
        MessageBuilder messageBuilder = new MessageBuilderImpl();
        //构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。
        Producer producer = provider.newProducerBuilder()
                .setTransactionChecker(messageView -> {
                    /**
                     * 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。
                     * 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。
                     */
                    final String orderId = messageView.getProperties().get("OrderId");
                    if (Strings.isNullOrEmpty(orderId)) {
                        // 错误的消息,直接返回Rollback。
                        return TransactionResolution.ROLLBACK;
                    }
                    return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
                })
                .build();
        //开启事务分支。
        final Transaction transaction;
        try {
            transaction = producer.beginTransaction();
        } catch (ClientException e) {
            e.printStackTrace();
            //事务分支开启失败,直接退出。
            return;
        }
        Message message = messageBuilder.setTopic("topic")
                //设置消息索引键,可根据关键字精确查找某条消息。
                .setKeys("messageKey")
                //设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("messageTag")
                //一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。
                .addProperty("OrderId", "xxx")
                //消息体。
                .setBody("messageBody".getBytes())
                .build();
        //发送半事务消息
        final SendReceipt sendReceipt;
        try {
            sendReceipt = producer.send(message, transaction);
        } catch (ClientException e) {
            //半事务消息发送失败,事务可以直接退出并回滚。
            return;
        }
        /**
         * 执行本地事务,并确定本地事务结果。
         * 1. 如果本地事务提交成功,则提交消息事务。
         * 2. 如果本地事务提交失败,则回滚消息事务。
         * 3. 如果本地事务未知异常,则不处理,等待事务消息回查。
         *
         */
        boolean localTransactionOk = doLocalTransaction();
        if (localTransactionOk) {
            try {
                transaction.commit();
            } catch (ClientException e) {
                // 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。
                e.printStackTrace();
            }
        } else {
            try {
                transaction.rollback();
            } catch (ClientException e) {
                // 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。
                e.printStackTrace();
            }
        }
    }

使用建议:


避免大量未决事务导致超时


Apache RocketMQ支持在事务提交阶段异常的情况下发起事务回查,保证事务一致性。但生产者应该尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能受损,容易导致事务处理延迟。


正确处理"进行中"的事务


消息回查时,对于正在进行中的事务不要返回Rollback或Commit结果,应继续保持Unknown的状态。 一般出现消息回查时事务正在处理的原因为:事务执行较慢,消息回查太快。解决方案如下:


  • 将第一次事务回查时间设置较大一些,但可能导致依赖回查的事务提交延迟较大。
  • 程序能正确识别正在进行中的事务。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7月前
|
消息中间件 存储 监控
RabbitMQ:分布式系统中的高效消息队列
RabbitMQ:分布式系统中的高效消息队列
|
7月前
|
消息中间件 存储 监控
消息队列:分布式系统中的重要组件
消息队列:分布式系统中的重要组件
|
7月前
|
消息中间件 Dubbo Java
Spring全家桶 、Dubbo、分布式、消息队列后端必备全套开源项目
基于 Spring Boot 2.X 版本的深度入门教程。 市面上的 Spring Boot 基础入门文章很多,但是深度入门文章却很少。对于很多开发者来说,入门即是其对某个技术栈的最终理解,一方面是开发者“比较懒”,另一方面是文章作者把 Spring Boot 入门写的太浅,又或者不够全面。
|
2月前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现
消息队列系统中的确认机制在分布式系统中如何实现
|
2月前
|
消息中间件 存储 监控
【10月更文挑战第2天】消息队列系统中的确认机制在分布式系统中如何实现
【10月更文挑战第2天】消息队列系统中的确认机制在分布式系统中如何实现
|
2月前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现?
消息队列系统中的确认机制在分布式系统中如何实现?
|
4月前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现?
消息队列系统中的确认机制在分布式系统中如何实现?
|
4月前
|
消息中间件 存储 Java
分布式消息队列基础知识
本文概述了分布式消息队列的基本概念、组成、模式、基础与高级功能,以及它在业务开发中的应用和核心技术,为深入学习RocketMQ等消息队列组件提供基础知识。
分布式消息队列基础知识
|
4月前
|
消息中间件 存储 监控
消息队列在分布式系统中如何保证数据的一致性和顺序?
消息队列在分布式系统中如何保证数据的一致性和顺序?
|
5月前
|
消息中间件 缓存 架构师
对抗软件复杂度问题之降低代码的复杂度,如何解决
对抗软件复杂度问题之降低代码的复杂度,如何解决

热门文章

最新文章