如何通过事务消息保障抢购业务的分布式一致性?

简介: 作者:山猎,阿里云解决方案架构师

前言



在电商领域,抢购和秒杀是非常普遍业务模式,抢购类业务在快速拉升用户流量并为消息者带来实惠的同时,也给电商系统带来了巨大考验。在高并发、大流量的冲击下,系统的性能和稳定性至关重要,任何一个环节出现故障,都会影响整体的购物体验,甚至造成电商系统的大面积崩溃。和电商领域抢购场景极为类似的业务模式还有很多,比如大型赛事和在线教育的报名系统,以及各类购票系统等。


针对抢购类业务在技术上带来的挑战,业界有一系列解决方案,通过不同维度来提升系统的性能与稳定性,包括动静分离、定时上架、异步处理、令牌队列、多级缓存、作弊行为侦测、流量防护、全链路压测等。


本文重点聚焦在如何确保抢购类业务的一致性上,分布式事务一直是IT界老大难的问题,而抢购业务所具备的高并发、大流量特征,更是成倍增加了分布式一致性的实现难度。以下将介绍如何通过事务消息构建满足抢购类业务要求的高性能高可用分布式一致性机制。


事务一致性原理回顾



事务是应用程序中一系列严密的操作,这一系列操作是一个不可分割的工作单位,它们要么全部执行成功,要么一个都不做。事务具有四个特征:原子性( Atomicity )、一致性( Consistency )、隔离性( Isolation )和持续性( Durability ),这四个特性简称为 ACID 特性。


在非并发状态下,保证事务的 ACID 特性是轻而易举的事情,如果某一个操作执行不成功,把前面的操作全部回滚就 OK 了。而在并发状态下,由于有多个事务同时操作同一个资源,对于事务 ACID 特性的保证就会困难一些,如果考虑得不周全,就会遇到如下几个问题:


  • 脏读:事务 A 读到了事务 B 还没有提交的数据。
  • 不可重复读:在一个事务里面对某个数据读取了两次,读出来的数据不一致。
  • 幻读:在一个事务对某个数据集用同样的方式读取了两次,数据集的条目数量不一致。


为了应对上述并发情况下出现的问题,就需要通过一定的事务隔离级别来解决。当事务的隔离级别越高的时候,上述问题发生的机会就越小,但是性能消耗也会越大。所以在实际生产过程中,要根据实际需求去确定隔离级别:


  • READ_UNCOMMITTED(读未提交):最低的隔离级别,可以读到未提交的数据,无法解决脏读、不可重复读、幻读中的任何一种。
  • READ_COMMITED (读已提交):能够防止脏读,但是无法解决不可重复读和幻读的问题。
  • REPEATABLE_READ (重复读取):对同一条数据的多次重复读取能保持一致,解决了脏读、不可重复读的问题,但是幻读的问题还是无法解决。
  • SERLALIZABLE ( 串行化):最高的事务隔离级别,避免了事务的并行执行,解决了脏读、不可重复读和幻读的问题,但性能最低。


关系型数据库提供了对于事务的支持,能够通过不同隔离级别的设置,确保并发状态下事务的ACID特性。但关系型数据库提供的能力是基于单机事务的,一旦遇到分布式事务场景,就需要通过更多其他技术手段来解决问题。


抢购业务中的分布式事务



有如下三种情况可能会产生分布式事务:


1、一个事务操作包含对两个数据库的操作:数据库所提供的事务保证仅能局限在对于自身的操作上,无法跨越到其他数据库。


image.png

Transaction

DBA

Service

DBB

2、一个事务包含对多个数据分片的操作:具体的分片规则由分库分表中间件或者分库分表 SDK 来实现,有可能跨越多个数据库或同一个数据库的多个表。对于业务逻辑而言,底层的数据分片情况是不透明的,因此也没有办法依赖于数据库提供的单机事务机制。

image.png

Transaction

DBB分片1

Service

DBB分片2

3、一个事务包括对多个服务的调用:在微服务领域,这是极为常见的场景,不同的服务使用不同的数据资源,甚至涉及到更为复杂的调用链路。在这种情况下,数据库提供的单机事务机制,仅仅能保证其中单一环节的 ACID 特性,没有办法延伸到全局。

image.png

Transaction

DB

ServiceB

DB

ServiceA

ServiceC

DB

DB


微服务技术在电商领域的普及程度是非常高的,比较大型的电商应用还会通过中台思想将共性业务能力进行沉淀,因此抢购业务中的很多环境都属于跨服务的分布式调用,会涉及到上述第三种分布式事务形态。比如在订单支付成功后,交易中心会调用订单中心的服务把订单状态更新,并调用物流中心的服务通知商品发货,同时还要调用积分中心的服务为用户增加相应的积分。如何保障分布式事务一致性,成为了确保抢购业务稳定运行的核心诉求之一。image.png

上层业务

上层业务

上层业务

介介介

业务支撑

母母母留母

积分中心

订单中心

库存中心

会员中心

交易中心

DB

DB

DB

DB

DB

共享服务中心


分布式事务的实现方式


传统分布式事务


传统的分布式事务通过 XA 模型实现,通过一个事务协调者,站在全局的角度将多个子事务合并成一个分布式事务。XA 模型之所以能在分布式事务领域得到广泛使用,是因为其具有如下两个方面的优势:


  • 提供了强一致性保证,在业务执行的任何时间点都能确保事务一致性。
  • 使用简单。常见的关系型数据库都提供了对XA协议的支持,通过引入事务协调器,业务代码跟使用单机事务相比基本上没有差别。


但是在互联网领域,XA 模型的分布式事务实现存在很多局限性,在抢购业务这样的高并发大流量场景中更是被完全弃用。我们拿 XA 分布式协议中最普遍的两阶式提交方案,来说明为什么 XA 模型并不适合互联网场景。

image.png

PhaseII

PhaseI

commit

commit

prepare

yesno

m

2

1

2

3


1、性能问题。在两段式提交的执行过程中,所有参与节点都是事务阻塞型的,需要长时间锁定资源。这会导致系统整体的并发吞吐量变低,在抢购业务中是不可接受的。

2、单点故障问题。事务协调者在链路中有着至关重要的作用,一旦协调者发生故障,参与者会一直阻塞下去,整个系统将无法工作,因此需要投入巨大的精力来保障事务协调者的高可用性。

3、数据不一致问题。在阶段二中,如果协调者向参与者发送 commit 请求之后,发生了网络异常,会导致只有一部分参与者接收到了 commit 请求,没有接收到 commit 请求的参与者最终会执行回滚操作,从而造成数据不一致现象。在抢购业务中,这样的数据不一致有可能会对企业或消费者造成巨大的经济损失。


因此 XA 模型是一个理想化的分布式事务模型,并没有考虑到高并发、网络故障等实际因素,即便是在两阶段提交的基础上,诞生了三阶段提交这样的实现方式,也没有办法从根本上解决性能和数据不一致的问题。


柔性事务


针对传统分布式事务方案在互联网领域的局限性,业界提出了 CAP 理论以及 BASE 理论,在此基础上诞生了在大型互联网应用中广泛使用的柔性事务。柔性事务的核心思想是放弃传统分布式事务中对于严格一致性的要求,允许在事务执行过程中存在数据不一致的中间状态,在业务上需要容忍中间状态的存在。柔性事务会提供完善的机制,保证在一段时间的中间状态后,系统能走向最终一致状态。


遵循 BASE 理论的柔性事务放弃了隔离性,减小了事务中锁的粒度,使得应用能够更好的利用数据库的并发性能,实现吞吐量的线性扩展。异步执行方式可以更好地适应分布式环境,在网络抖动、节点故障的情况下能够尽量保障服务的可用性。因此在高并发、大流量的抢购业务中,柔性事务是最佳的选择。



传统分布式事务 柔性事务
业务改造
一致性 强一致性 最终一致
回滚 支持     实现回退接口
隔离性 支持     放弃隔离性或实现资源锁定接口
并发性能     低    
适合场景     低并发、短事务 高并发、长事务


柔性事务有多种实现方式,包括TCC、Saga、事务消息、最大努力通知等,本文将重点介绍通过事务消息实现柔性事务。


事务消息原理分析


抢购业务场景拆解


我们结合抢购业务的真实场景,分析如何通过事务消息实现分布式一致性。在抢购业务中,有两个非常关键的阶段,需要引入分布式事务机制,分别是订单创建阶段和付款成功阶段。


从字面含义来看,抢购业务就隐含了一个重要的前提:库存是有限的。因此在订单创建的时候,需要预先检查库存情况,并相对应的库存进行锁定,以防止商品超卖。如果库存锁定操作失败,代表库存不足,必须确保订单不能被成功创建。在锁定库存后,如果因为某种异常情况导致订单创建失败,必须及时将之前锁定的库存进行释放操作,以便让其他用户可以重新争夺对应的商品。


如果抢购系统实现了购物车机制,在订单创建的同时,则需要从购买车中将相应的条目删除。

image.png


基于微服务架构的业务拆分,订单创建阶段的 3 个行为很有可能通过 3 个不同的微服务应用完成,因此需要通过分布式事务来保证数据一致性。


订单创建完成后,会等待用户付款,一旦付款成功,就会触发付款成功阶段的执行逻辑。这个阶段同样是通过分布式事务来完成,包含修改订单状态、扣减库存、通知发货、增加积分这4个子事务,它们要么全部不执行,要么全部执行成功。

image.png

修改订单状态

扣减库存

通知发货

增加积分

付款完成阶段


当然,在真实的抢购业务中,情况有可能会更加的复杂,本文列出的只是其中最具代表性的几类业务行为。

引入消息异步通知机制


传统的分布式事务存在一个很大的弊端是参与节点都是事务阻塞型的,需要长时间锁定资源。以锁定库存 ->创建订单这个流程为例,借助于 Redis 等缓存系统,单纯锁定库存的操作只需要花费毫秒级的时间,可以承载非常高的并发量。但如果把创建订单的操作也考虑进来,加上不同微服务应用之间相互通讯的时候,整体耗时有可能超过1秒,导致性能急剧下降。


假设存在一种异步消息机制,让分布式事务的第一个参与方在执行完本地事务后,通过触发一笔消息通知事务的其他参与方完成后续的操作,就能将大事务拆解为多个本地子事务来分开执行。在这种模式下,事务的多个参与方之间之间并不需要彼此阻塞和等待,就能极大程度地提升并发吞吐能力。对于库存中心而言,在高并发场景下,只需要不断的执行锁定库存记录操作,并不断通过异步消息通知订单中心创建订单,只要异步消息机制能确保消息一定送达,并得到正确处理,就能够实现分布式最终一致性。

image.png

锁定库存记录

耗时:1ms

库存中心

事务时间顺序序

异步消息

创建订单

耗时:15ms

订单中心


先执行本地事务,还是先发送异步消息?


在这个模型中,异步消息的发送交给了分布式事务的第一个参与方来完成,这个参与方就拥有了两个职责:执行本地事务发送异步消息。到底应该先执行本地事务,还是先发异步消息呢?


第一种方案是先发送异步消息,再执行本地事务。这样做肯定是不行的,如果本地事务没有执行成功,异步消息已经发出去了,其他事务参与方收到消息后会执行对应的远程事务,造成数据不一致。


第二种方案是先执行本地事务,再发送异步消息。这样做能够解决本地事务执行失败导致的数据不一致问题,因为只有在本地事务执行成功的情况下,才会发送异步消息。但如果事务的参与方在执行本地事务成功后,自己宕机了,就再有没有机会发送异步消息了,因此这样做同样会造成数据不一致的问题。记住:在真实场景中,任何一个应用节点都不是 100% 可靠的,都存在宕机的可能性。


一个可行的方案是引入可以处理事务消息的消息队列集群,用于异步消息的中转。一个事务消息包含两种形态:


1、首先,事务的参与方发送一笔半事务消息到消息队列,表示自己即将执行本地事务,消息队列集群在收到这个半事务消息后,不会马上进行投递,而是进行暂存。


2、在执行完本地事务后,事务的参考方再发送一笔确认消息到消息队列集群,告知本地事务的执行状态。如果本地事务执行成功,消息队列集群会将之前收到的半事务消息进行投递;如果本地事务执行失败,消息队列集群直接删除之前收到的半事务消息,这样远程事务就不会被执行,从而保证了最终一致性。


同样,如果事务参与方在执行完本地事务后宕机了怎么办呢?这就需要消息队列集群具备回查机制:如果收到半事务消息后,在特定时间内没有再收到确认消息,会反过来请求事务参与方查询本地事务的执行状态,并给予反馈。这样,即便错过了确认消息,消息队列集群也有能力了解到本地事务的执行状态,从而决定是否将消息进行投递。


在一个微服务应用中,会存在多个对等的应用实例,这也就代表着即便一个事务参与方的实例在执行完本地事务后宕机了,消息队列集群依然可以通过这个实例的兄弟实例了解到本地事务执行的最终状态。

image.png

发送半事务消息

执行本地事务

2

发送确认消息

消息队列集群

检查本地事务执行状态

巴巴图

应用实例

应用实例

应用实例

消息发送方

如何确保远程事务能执行成功?


如果一切本地事务的执行,以及异步消息的投递都一切顺利的话,接下来还会存在另外两种数据不一致的可能性:


  • 消息队列集群在将异步消息投递到远程事务参与方的时候,由于网络不稳定,消息没能投递成功。
  • 消息投递成功了,但远程事务参与方还没来得及执行远程事务,就宕机了。


这两种情况都会导致远程事务执行失败,所以需要建立一种消息重试机制,让远程事务参与方在完成任务后(实际上对远程事务参与方而言,这个任务是它要执行的本地任务),给予消息队列集群一个反馈,告知异步消息已经得到了正确的处理。否则,消息队列会在一定时间后,周期性的重复投递消息,直到它收到了来自远程事务参与方的反馈,以确保远程事务一定能执行成功。

image.png


和事务回查机制类似,远程事务参与方也有多个对等的微服务实例,即便某个实例在没来得及执行远程事务的时候宕机,消息队列也可以将任务交给这个实例的兄弟实例来完成。

完整流程


image.png

7.根据事务的状态Commit/Rollback

-1.发送半事务消息

3.执行

Commit:

本地事务

消息订阅方

消息发送方

服务端

2.半事务消息发送成功

本地事务

投递消息

.Commit或者Rollback

Rollback:

不投递消息,存

6.检查本地事务的状态

储三天后删除

5.未收到4的确认时,回查事务状态


事务消息实战



了解到事务消息的原理后,我们不难得出一个结论:消息队列集群在整个流程中起着至关重要的作用,如果消息队列集群不可用,所有涉及到分布式事务的业务都将中止!因此,我们需要一个高可用的消息队列集群,能够始终保持在工作状态,即便其某个组件出现故障,也能够在短时间内自动恢复,不会影响业务,还能确保接收到的消息不丢失。


消息队列 RocketMQ


消息队列 RocketMQ 版是阿里云基于 Apache RocketMQ 构建的低延迟、高并发、高可用、高可靠的分布式消息中间件。该产品最初由阿里巴巴自研并捐赠给 Apache 基金会,服务于阿里集团 13 年,覆盖全集团所有业务,包括种类金融级场景。作为双十一交易核心链路的官方指定产品,支撑千万级并发、万亿级数据洪峰,历年刷新全球最大的交易消息流转记录。

image.png

构建分布式应用的基础设施,消息传递无边界

Apache

生态丰富

便捷的运维体系

APACHE'

RocketMQ

THEADACHeSOfwrEFOundAonAoucesApacHeRockeMomsaTop

消息类型:定时消息,顺序消息,事务消息

消息回溯,死信队列,监控告警

LevelProject

多维度消息查询,全链路消息轨迹

多种订阅:Pub/sub,Tag过滤,SQL过滤

云产品集成:Lo,大数据,音视频等各领域

主子账号,TLS传输加密

L井s

网史巴巴

1众日保脸

天弘享全

性能优越

服务健壮

1.N

天猫

SNIE

聚石塔

海量消息堆积能力

99.99%服务可用性

毫秒级端到端延迟

99.99999999%数据可靠性

千万级高并发处理能力

熔断机制,消息重投机制

中国电让

芒东tV

中国税漆

ANICom中国


阿里云消息队列 RocketMQ 提供了对于事务消息机制最完整实现,包括半事务消息、确认消息、事务回查机制、消息重试等重要功能。此外,消息队列 RocketMQ 还提供了极强的高可用能力以及数据可靠性,可以确保在各种极端场景下都能提供稳定的服务,并确保消息不丢失。


对于开发者而言,使用云上的消息队列 RocketMQ,可以免除消息队列集群的搭建和维护工作,将更多的精力投入到实现业务逻辑的工作中。当消息队列集群的性能不能满足要求时,还可以非常方便的进行集群一键扩容,以获得更高的并发吞吐量。


开通 RocketMQ 服务


在阿里云官方网站开通消息队列服务后方可开始使用消息队列 RocketMQ,如果使用RAM用户访问RocketMQ,还必须先为RAM用户进行授权。在完成阿里云账户注册以及实名认证后,打开消息队列 RocketMQ 版产品页(https://www.aliyun.com/product/rocketmq),点击 免费开通,页面跳转至消息队列 RocketMQ 版控制台,在弹出的提示对话框中,完成 RocketMQ 服务的开通。



接下来,登录 RAM 控制台(https://signin.aliyun.com/1880770869023420.onaliyun.com/login.htm),在左侧导航栏选择 人员管理 > 用户,在 用户 页面,单击目标 RAM 用户 操作列 添加权限,在 添加权限 面板,单击需要授予 RAM 用户的权限策略,单击 确定。消息队列 RocketMQ 提供多种系统策略,可以根据权限范围为RAM用户授予相关权限。为了简单起见,我们先开通 AliyunMQFullAccess 权限策略,授予该 RAM 用户所有消息收发权限和控制台所有功能操作权限。

创建资源


在调用 SDK 收发消息前,需在消息队列 RocketMQ 控制台创建相关资源,在调用SDK时需填写这些资源信息。首先,我们要创建 RocketMQ 实例,实例是用于消息队列 RocketMQ 服务的虚拟机资源,相当于一个独立的消息队列集群,会存储消息 Topic 和客户端 Group ID 信息。


我们还需要注意,只有在同一个地域下的同一个实例中的 Topic 和 Group ID 才能互通,例如,某 Topic 创建在华东1(杭州)地域的实例 A 中,那么该 Topic 只能被在华东1(杭州)地域的实例 A 中创建的 Group ID 对应的生产端和消费端访问。


登录到消息队列 RocketMQ 控制台(http://ons.console.aliyun.com),在左侧导航栏,单击 实例列表,在顶部菜单栏,选择地域,如 华东1(杭州),在 实例列表 页面,单击 创建实例,在 创建 RocketMQ 实例 面板,完成实例的创建。


接下来,在实例所在页面的左侧导航栏,单击 Topic 管理。在 Topic 管理页面,单击 创建 Topic,在 创建 Topic面板,输入 名称描述,选择该Topic的 消息类型事务消息,完成Topic的创建。


Topic 是消息队列 RocketMQ 版里对消息的一级归类,例如创建 Topic_Trade 这一 Topic 来识别交易类消息,消息生产者将消息发送到 Topic_Trade,而消息消费者则通过订阅该 Topic 来获取和消费消息。

image.png

创建Topic

标准版实例中创建的Topic每天将产生少量费用!

标准实例将根据的调用次数和]o资占用进行费,请及时抽降不需要的10p资课,购买资识包优重,击这里

了解更多计费信息.

0/64

名称:

长度限制为3~64个字将,只能包含英文,数字,短横线(-)以及下划线().

0/128

描述:

事务消息

消息类型:

分区顺序消息

普通消息

全局顺序消息

定时/延时消息

事务消息提供类似xiOpenXA的分布事务功能,通过事务消息能达到分布式事务的最终一

i

致.点击这里查看更多参考.


创建完实例和 Topic 后,需要为消息的消费者和或生产者创建客户端 ID,即 Group ID 作为标识。在事务消息的场景中,需要创建 2 个不同的 Group ID,分别代表本地事务参与方和远程事务参与方。在实例所在页面的左侧导航栏,单击 Group 管理,在 Group 管理 页面,选择 TCP 协议 > 创建 Group ID,在 创建可用于 TCP 协议的 Group 面板,完成本地事务客户端 Group ID 的创建。重复此操作,完成远程事务参与方 Group ID 的创建。

image.png

本地事务参与方的业务代码


本文将通过 Java 代码介绍如何实现事务消息相关的业务逻辑,为了简化业务逻辑,我们继续基于 锁定库存 - > 创建订单 这个流程来演示,在这个流程中,仅有 2 个事务参与方。


1、初始化 TransactionProducer


我们先通过 Maven 引入消息队列 RocketMQ 的 SDK,优先使用阿里云官方提供的 TCP 版 SDK。

<dependency>
  <groupId>com.aliyun.openservices</groupId>
  <artifactId>ons-client</artifactId>
  <version>1.8.7.2.Final</version>
</dependency>

顺利引入 Log4j2 用于日志输出。

<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-api</artifactId>
  <version>1.7.7</version>
</dependency>
<dependency>
  <groupId>org.apache.logging.log4j</groupId>
  <artifactId>log4j-slf4j-impl</artifactId>
  <version>2.13.1</version>
</dependency>


在库存中心的代码中,我们需要初始化一个TransactionProducer,用于异步消息的发送,需要填入如下信息:


  • Group ID:之前创建的用于本地事务参与方的 Group ID。
  • Access key和Secret Key:RAM 用户对应的密钥信息,从 RAM 用户控制台获得。
  • Nameserver Address:RocketMQ 实例的接入点信息,从 RocketMQ 控制台获得。
Properties properties = new Properties();
// 您在控制台创建的Group ID。注意:事务消息的Group ID不能与其他类型消息的Group ID共用。
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。
properties.put(PropertyKeyConst.AccessKey, "XXX");
// AccessKey Secret阿里云身份验证,在阿里云RAM控制台创建。
properties.put(PropertyKeyConst.SecretKey, "XXX");
// 设置TCP接入域名,进入消息队列RocketMQ版控制台的实例详情页面的TCP协议客户端接入点区域查看。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
// LocalTransactionCheckerImpl本地事务回查类的实现
TransactionProducer producer = ONSFactory.createTransactionProducer(properties,
    new LocalTransactionCheckerImpl());
producer.start();


TransactionProducer 是线程安全的,启动后能在多线程环境中复用。


2、获取全局唯一的交易流水号


在发送半事务消息以及执行本地事务之前,我们需要先获取一个全局唯一的交易流水号,订单与交易流水号一一对应,接下来的事务消息机制都会依赖于这个这个交易流水号。我们可以通过引入第三方 ID 生成组件,或者在本地通过 Snowflake 算法实现。


3、实现本地事务回查逻辑


创建一个实现了LocalTransactionChecker接口的LocalTransactionCheckerImpl类,实现其中的check(Message)方法,该方法返回本地事务的最终状态。至于具体的业务逻辑如何实现,不在本文讨论的范围之前,我们将其封装在BusinessService类中。

package transaction;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LocalTransactionCheckerImpl implements LocalTransactionChecker {
    private static Logger LOGGER = LoggerFactory.getLogger(LocalTransactionCheckerImpl.class);
    private static BusinessService businessService = new BusinessService();
    @Override
    public TransactionStatus check(Message msg) {
        // 从消息体中获得的交易ID
        String transactionKey = msg.getKey();
        TransactionStatus transactionStatus = TransactionStatus.Unknow;
        try {
            boolean isCommit = businessService.checkbusinessService(transactionKey);
            if (isCommit) {
                transactionStatus = TransactionStatus.CommitTransaction;
            } else {
                transactionStatus = TransactionStatus.RollbackTransaction;
            }
        } catch (Exception e) {
            LOGGER.error("Transaction Key:{}", transactionKey, e);
        }
        LOGGER.warn("Transaction Key:{}transactionStatus:{}", transactionKey, transactionStatus.name());
        return transactionStatus;
    }
}

4、执行本地事务并发送异步消息


我们先组装一条异步消息,其中包含了全局交易 ID,消息将要发往的 Topic,以及消息体。远程事务参与方将通过这个消息体中获取执行远程事务所必须的数据信息。


接下来,将这条异步消息连同一个实现了LocalTransactionExecuter接口的匿名类,通过send方法进行发送,这就是本地事务参与方所需要实现的所有业务代码了。当然,这个匿名类实现了TransactionStatus execute.execute()方法,其中包含了对于本地事务的执行。完整代码如下:

package transaction;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class TransactionProducerClient {
    private static Logger LOGGER = LoggerFactory.getLogger(TransactionProducerClient.class);
    private static final BusinessService businessService = new BusinessService();
    private static final String TOPIC = "create_order";
    private static final TransactionProducer producer = null;
    static {
        Properties properties = new Properties();
        // 您在控制台创建的Group ID。注意:事务消息的Group ID不能与其他类型消息的Group ID共用。
        properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        // AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。
        properties.put(PropertyKeyConst.AccessKey, "XXX");
        // AccessKey Secret阿里云身份验证,在阿里云RAM控制台创建。
        properties.put(PropertyKeyConst.SecretKey, "XXX");
        // 设置TCP接入域名,进入消息队列RocketMQ版控制台的实例详情页面的TCP协议客户端接入点区域查看。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
        // LocalTransactionCheckerImpl本地事务回查类的实现
        TransactionProducer producer = ONSFactory.createTransactionProducer(properties,
                new LocalTransactionCheckerImpl());
        producer.start();
    }
    public static void main(String[] args) throws InterruptedException {
        String transactionKey = getGlobalTransactionKey();
        String messageContent = String.format("lock inventory for: %s", transactionKey);
        Message message = new Message(TOPIC, null, transactionKey, messageContent.getBytes());
        try {
            SendResult sendResult = producer.send(message, (msg, arg) -> {
                // 此处用Lambda表示,实际是实现TransactionStatus execute(final Message msg, final Object arg)方法
                TransactionStatus transactionStatus = TransactionStatus.Unknow;
                try {
                    boolean localTransactionOK = businessService.execbusinessService(transactionKey);
                    if (localTransactionOK) {
                        transactionStatus = TransactionStatus.CommitTransaction;
                    } else {
                        transactionStatus = TransactionStatus.RollbackTransaction;
                    }
                } catch (Exception e) {
                    LOGGER.error("Transaction Key:{}", transactionKey, e);
                }
                LOGGER.warn("Transaction Key:{}", transactionKey);
                return transactionStatus;
            }, null);
            LOGGER.info("send message OK, Transaction Key:{}, result:{}", message.getKey(), sendResult);
        } catch (Exception e) {
            LOGGER.info("send message failed, Transaction Key:{}", message.getKey());
        }
        // demo example防止进程退出
        TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);
    }
    private static String getGlobalTransactionKey() {
        // TODO
        return "";
    }
}

得益于 RocketMQ SDK 优秀的封装,发送半事务消息、发送确认消息、事务回查等重要步骤都已经完整实现,不需要开发者再编写代码了,这将为用户带来特别顺畅开发体验。


远程事务参与方的业务代码


相对本地事务参与方而言,远程事务参与方的代码更加简单,只需要从异步消息中提取出对应信息,完成对远程事务的执行即可。

package transaction;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class TransactionConsumerClient {
    private static Logger LOGGER = LoggerFactory.getLogger(TransactionProducerClient.class);
    private static final BusinessService businessService = new BusinessService();
    private static final String TOPIC = "create_order";
    private static final Consumer consumer = null;
    static {
        Properties properties = new Properties();
        // 在控制台创建的Group ID,不同于本地事务参与方使用的Group ID
        properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        // AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。
        properties.put(PropertyKeyConst.AccessKey, "XXX");
        // Accesskey Secret阿里云身份验证,在阿里云服RAM控制台创建。
        properties.put(PropertyKeyConst.SecretKey, "XXX");
        // 设置TCP接入域名,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.start();
    }
    public static void main(String[] args) {
        consumer.subscribe(TOPIC, "*", (message, context) -> {
                    LOGGER.info("Receive: " + message);
                    businessService.doBusiness(message);
                    // 返回CommitMessage,代表给予消息队列集群异步消息已经得到正常处理的回馈
                    return Action.CommitMessage;
                }
        );
    }
}

事务回滚


是否存在这样的情况:当本地事务执行成功后,因为远程事务没有办法执行,而导致本地事务需要进行回滚操作呢?在事务消息原理分析一节,我们已经介绍过如何通过消息重试,确保远程事务能够执行成功,这是不是已经说明只要异步消息被确认,远程事务就一定可以执行成功,从而不存在对本地事务的回滚呢?


实际生产情况下,确实存在远程事务无法正常执行的情况。比如在付款成功阶段,当本地事务“修改订单状态”执行完成后,在执行远程事务“通知发货”的时,因为订单地址有误而被物流公司拒绝,这种情况下就必须对订单状态进行回退操作,并发起退款流程。


所以在执行远程事务的时候,我们有必要区分如下两种完全不同的异常:


  • 技术异常:远程事务参与方宕机、网络故障、数据库故障等。
  • 业务异常:远程逻辑在业务上无法执行、代码业务逻辑错误等。


简单来讲,当远程事务执行失败的时候,能够通过消息重试的方式解决问题的,属于技术异常;否则,属于业务异常。基于事务消息的分布式事务机制不能实现自动回滚,当业务异常发生的时候,必须通过回退流程确保已经完成的本地事务得到恢复。比如在修改订单状态 -> 通知发货这个场景中,如果由于业务异常导致无法发货的时候,需要通过额外的回退流程,将订单状态设置为“已取消”,并执行退款流程。

image.png

正常流程:订单设置为"已发货"后,通知发货

消息队列

物流中心

订单中心

集群

回退流程:没有办法发货的情况下,修改订单状态为"它取消",并发起退款流程


在事务消息机制中,回退流程相当于远程事务参与方和本地事务参与方调换了角色,和正常流程一样,同样也可以通过事务消息来完成分布式事务。由于正常流程和回退流程的业务逻辑是完全不一样的,所以最理想的方式是建立另外一个 Topic 来实现。这也就说明,我们在创建事务消息 Topic 的时候,要充分考虑到这个 Topic 背后的业务含义,并在 Topic 命名上尽可能的与真实业务相匹配


多个事务参与方


本节展示的示例中,都只涉及到 2 个事务参与方,但在真实世界中,分布式事务往往涉及到更多的事务参与方,比如之前提到的付款成功环节,有修改订单状态->扣减库存->通知发货->增加积分这 4 个需要同时进行的操作,涉及到 4 个事务参与方。这种情况下如何通过事务消息来实现分布式事务呢?


我们依然可以继续使用之前的架构,只需要加入多个远程事务参与方就行了。可以通过 RocketMQ 的多消费组关联多个远程事务参与方,每一个参与方对应一个 Group ID,在这种情况下,同一个异步消息会复制成多份投递给不同的事务参与方。


image.png

GroupID三A

库存中心

远程事务1

本地事务执行成功,

发送异步消息

GroupID-B

远程事务2

物流中心

订单中心

消息队列

集群

远程事务3

GroupID二C

积分中心

需要特别引起注意的是,当某个远程事务参与方遇到业务异常的时候,需要通知其他所有事务参与方执行回退流程,这无疑会增加业务逻辑的整体复杂度。为了简化事务消息的执行流程,我们可以对业务逻辑预先进行梳理,将子事务分为如下两类:


  • 有可能发生业务异常的:比如锁定库存的操作,有可能因为库存不足而执行失败。又比如扣除积分的操作,有可能因为用户积分不足而无法扣除。
  • 不太可能发生业务异常的:比如删除购物车条目的操作,除非是技术类故障,一定可以执行成功,即便对应的条目并不存在,也没有关系。又比如积分增加的操作,只要对应的用户没有注销,是不可能遇到业务异常的。


我们尽量将第一类事务作为本地事务而实现,将第二类事务作为远程事务而实现,这样就可以最大程度避免回退流程。

image.png

GroupID三A

库存中心

GroupID-B

GroupID-B

发货失败,通知所有

其他事务参与方执行

消息队列

物流中心

订单中心

回退流程

集群

正常流程

GroupID二C

回退流程

积分中心


其他注意事项


消息幂等


RocketMQ 能保证消息不丢失,但不能保证消息不重复,所以消费者在接收到消息后,有必要根据业务上的唯一 Key 对消息做幂等处理。在抢购业务中,唯一 Key 当然就是全局唯一的交易流水号,具体幂等处理方法在互联网上有很多文章供读者参考。


当然,不是每一种业务远程事务都需要确保消息的幂等性,比如删除购物车指定条目这样的操作,在业务上是可以容忍多次反复执行的,就没有必要引入额外的幂等处理了。

每日对账


不同于传统事务的强一致性保证,柔性事务需要经历一个中间状态,才到达成事务的最终一致性。有某些特殊情况下,这个中间状态会持续非常长的时间,甚至需要人工主动介入才能实现最终一致性:


1、消息重试多次后,依然不成功:当消费者完全无法正常工作的时候,RocketMQ 不可能永无止境地重试消息,事实上,如果16次重试后异步消息依然没有办法被正常处理,RocketMQ 会停止尝试,将消息放到一个特殊的队列中。

2、未处理的业务异常:比如给某个账号加积分的时候,发现此账号被注销了,这是一个非常罕见的业务现象,有可能事先对此并没有健壮的处理机制。

3、幂等校验失败:处理幂等所依赖的系统比如 Redis 发生了故障,导致某些消息被重复处理。

4、其他严重的系统故障:比如网络长时间中断,留下了大量执行到一半的事务。

5、其他漏网之鱼。


这些情况下,我们都有需要通过定期对账机制来进行排查,在必要的时候发起人工主动介入流程,修复不一致的数据。事实上,在任何柔性事务的实现中,每日对账都是必不可少的数据安全保障性手段。


总结



在柔性事务的多种实现中,事务消息是最为优雅易用的一种。基于阿里云 RocketMQ 高性能、高可用的特点,完全可以胜任抢购业务这类高并发大流量的场景。在阿里巴巴自身的业务中,事务消息也广泛使用于双 11 这样的大型营销活动中,有着非常高的通用性。


但在IT领域,没有任何一种技术是银弹,引入事务消息机制需要针对性的修改业务逻辑,还需要借助于每日对账等额外的手段确保数据安全,在实现高性能的同时,也增加了整体的业务复杂度。我们需要对业务场景进行充分评估,对比多种不同的技术实现方案,从中挑选与自身业务特点最为匹配的一种,才能更好地发挥柔性事务的价值。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
SQL 关系型数据库 MySQL
乐观锁在分布式数据库中如何与事务隔离级别结合使用
乐观锁在分布式数据库中如何与事务隔离级别结合使用
|
2月前
|
SQL 关系型数据库 MySQL
乐观锁在分布式数据库中如何与事务隔离级别结合使用
乐观锁在分布式数据库中如何与事务隔离级别结合使用
|
12天前
|
消息中间件 架构师 数据库
本地消息表事务:10Wqps 高并发分布式事务的 终极方案,大厂架构师的 必备方案
45岁资深架构师尼恩分享了一篇关于分布式事务的文章,详细解析了如何在10Wqps高并发场景下实现分布式事务。文章从传统单体架构到微服务架构下分布式事务的需求背景出发,介绍了Seata这一开源分布式事务解决方案及其AT和TCC两种模式。随后,文章深入探讨了经典ebay本地消息表方案,以及如何使用RocketMQ消息队列替代数据库表来提高性能和可靠性。尼恩还分享了如何结合延迟消息进行事务数据的定时对账,确保最终一致性。最后,尼恩强调了高端面试中需要准备“高大上”的答案,并提供了多个技术领域的深度学习资料,帮助读者提升技术水平,顺利通过面试。
本地消息表事务:10Wqps 高并发分布式事务的 终极方案,大厂架构师的 必备方案
|
1月前
|
监控
Saga模式在分布式系统中保证事务的隔离性
Saga模式在分布式系统中保证事务的隔离性
|
3月前
Saga模式在分布式系统中如何保证事务的隔离性
Saga模式在分布式系统中如何保证事务的隔离性
|
2月前
|
消息中间件 缓存 算法
分布式系列第一弹:分布式一致性!
分布式系列第一弹:分布式一致性!
|
2月前
|
算法 Java 关系型数据库
漫谈分布式数据复制和一致性!
漫谈分布式数据复制和一致性!
|
4月前
|
存储 NoSQL 算法
Go 分布式令牌桶限流 + 兜底保障
Go 分布式令牌桶限流 + 兜底保障
|
2月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
12天前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
35 5