引言
相信大家对于事务问题都不陌生,在之前《MySQL事务篇》中曾详解过MySQL
的事务机制,在传统的单库环境下开发,咱们可依赖于MySQL
所提供的事务机制,来确保单个事务内的一组操作,要么全部执行成功,要么全部执行失败。
例如一个下单业务中,假设由「扣减库存、新增订单」两个操作组成,在单库中通过
MySQL
提供的事务机制,能够确保该事务中任意操作执行出现问题时,另一个操作变更的数据可以回滚,从而确保整库数据的一致性,避免产生库存扣了,但订单却未增加的情况出现。
在传统的单体架构中做单库开发,数据的一致性可以通过InnoDB
事务机制来保障,但当项目换到分布式架构的环境时,或者当项目换到分库分表的环境时,答案亦是如此吗?并非如此,在分布式环境下,由于每个库都维护着自己的事务机制,相互之间无法感知对方的事务,因此就会出现分布式事务问题,这也是分布式系统中头疼多年的一个棘手问题!
本章的核心则是讲清楚分布式事务问题,以及该如何去解决这种棘手问题,但实际目前对于分布式事务的解决方案已经十分成熟了,即
Spring Cloud Alibaba
中的Seata
框架,以及更早期的GTS、LCN、 Atomikos、RocketMQ、Sharding-Sphere...
等框架都能够很好的解决分布式事务问题。
也正由于分布式事务问题的解决方案已经比较完善,基本上一个注解、几行代码、几行配置的事情,就能够轻松解决分布式事务问题,因此本章并非单纯去讲述这些框架的基本使用,而是从另一个角度来思考分布式事务问题,即假设没有这些成熟的解决问题,咱们遇到这个问题时又该如何处理呢?接下来会与大家一起,手把手的自己编写一个分布式事务框架。
因为涉及到了分布式事务框架的手写,可能内容会比较偏向于底层原理的分享,我会尽量在把这些内容写的简单一点,同时对于每个核心段落也会画图示例,但本身这个题材就比较硬核,因此想要彻底读懂这章,最好具备如下基础:
- 分布式知识储备:主要是指
SpringCloud
微服务与RPC
远程调用的基本使用。 - 网络知识储备:主要是指
Netty
框架的使用、序列化知识、P2P
网络通信的原理。 Spring
相关的知识储备:主要是Transactional
事务机制原理、AOP
切面使用。Java-JUC
并发编程的知识储备:主要是ThreadLocal、Condition、Lock、
线程池技术。- 其他的知识储备:主要是指自定义注解式开发、
Maven
打包技术、MySQL
事务原理。
如若大家不具备上述基础,实则也无需担心,通篇读下来应该大致原理也能够弄懂,本章要做的是让诸位知其然并知其所以然,对技术不要停留在单纯的使用层面,而应该适当性的去参悟底层的实现原理,这才是大家与其他开发者拉开差距的核心竞争力。
因为个人还并未阅读过
Seata
框架的源码,因此本章是之前在阅读LCN
这个老牌分布式事务框架仿写的,所以很多实现是借鉴于LCN
的部分实现,但LCN
和Seata-AT
模式大致相同,因此诸位也可将本篇当做Seata-AT
模式的原理篇来阅读,在本章末尾也会提供源码实现。最后,如若你对于手写框架系列的内容感兴趣,那也可以看看之前曾发布过的《手写SpringMVC框架》这篇文章。
一、何谓分布式事务问题?
首先来聊聊啥是分布式事务问题,因为现在的分布式/微服务系统开发中,基本上每个核心服务都会具备自己的独享库,也就是垂直分库的模式,以前面的例子来说,订单服务有订单DB
,库存服务有库存DB
,每个服务之间的数据库都是独立的。此时先回顾原本单库环境中解决事务问题的方式,如下:
// 下单服务
@Transactional
public void placeAnOrder(){
// 调用扣减库存的方法
inventoryService.minusInventory();
// 调用增加订单的方法
orderService.insertOrder();
}
一个下单业务的伪代码如上,会先调用「扣减库存」的方法,接着再调用「新增订单」的方法,为了确保下单这组操作的数据一致性,通常会在方法上加一个@Transactional
注解,这样就会将事务托管给Spring
来负责,如下:
- 在该方法执行时,
Spring
会首先向数据库发送一条begin
开启事务的命令。 - 如果执行过程中出现异常,
Spring
会向数据库发送一条rollback
回滚事务的命令。 - 如果执行一切正常,
Spring
会向数据库发送一条commit
提交事务的命令。
Spring
注解式事务的逻辑图如下:
这种事务管理机制,在单体架构中显然十分好用,但放到分布式环境中,情况则不同,如下:
由于分布式系统都会根据业务去拆分子系统/子服务,因此不同业务之间只能通过RPC
的方式,远程调用对方所提供的API
接口,假设这里在库存服务本地的「扣减库存」方法上加一个@Transactional
注解,同时在订单服务本地的「新增订单」方法也加一个@Transactional
注解,Spring
内部的处理逻辑如下:
- 下单业务远程调用「减库存」接口时,
Spring
会先向库存DB
发送一个begin
命令开启事务。 - 当扣减库存的业务执行完成后,
Spring
会直接向库存DB
发送一个commit
命令提交事务。 - 下单业务调用本地的「新增订单」方法时,
Spring
又会向订单DB
发送begin
命令开启事务。 - 当新增订单执行出现异常时,
Spring
会向订单DB
发送一个rollback
命令回滚事务。
此时分析如上场景,下单业务理论上应该属于同一个事务,但之前《MySQL事务篇》聊到过,InnoDB
的事务机制是基于Undo-log
日志实现的,那么减库存产生的回滚记录会记录到库存DB
的Undo-log
中,而新增订单产生的回滚记录则会记录到订单DB
的Undo-log
中,此时由于服务不同、库不同,因此相互之间无法感知到对方的事务。
当后续「新增订单」的操作执行出现异常,
Spring
框架发送的rollback
命令,就只能根据订单DB
中的回滚记录去还原数据,此时前面扣减过的库存数据就无法回滚,因此导致了整体数据出现了不一致性。
1.1、分布式事务问题演示
前面简单讲述了分布式事务问题,但这样讲起来似乎有些令人费脑,那接下来直接上个案例,实际感受一下分布式事务造成的数据不一致问题,这里基于SpringCloud
快速搭建了一个微服务项目,为了节省篇幅就不带着诸位一起走简单的搭建流程了,完整的源码地址会在最后给出,其中有订单、库存两个子服务,库存服务提供了一个减库存的接口,如下:
@RestController
@RequestMapping("/inventory")
public class InventoryAPI {
// 注入本地的InventoryService
@Autowired
private InventoryService inventoryService;
@RequestMapping("/minusInventory")
public String minusInventory(Inventory inventory) {
// 根据传入的商品ID先查询库存
Inventory inventoryResult =
inventoryService.selectByPrimaryKey(inventory.getInventoryId());
// 如果库存不足则返回相应提示
if (inventoryResult.getShopCount() <= 0) {
return "库存不足,请联系卖家....";
}
// 如果商品还有剩余库存则对库存减一,接着修改数据库中的库存数量
inventoryResult.setShopCount(inventoryResult.getShopCount() - 1);
int n = inventoryService.updateByPrimaryKeySelective(inventoryResult);
System.out.println("库存信息:" + inventoryResult.toString());
// 扣减库存成功后,向客户端返回对应的提示
if (n > 0) {
return "端口:" + port + ",库存扣减成功!!!";
}
return "端口:" + port + ",库存扣减失败!!!";
}
}
// 库存服务本地的InventoryService实现类
@Service
public class InventoryServiceImpl implements InventoryService {
// 减库存会调用的修改方法,在上面添加了@Transactional注解
@Override
@Transactional
public Integer updateByPrimaryKeySelective(Inventory record) {
int i = inventoryMapper.updateByPrimaryKeySelective(record);
return i;
}
}
而订单服务中提供了一个下单接口,如下:
@RestController
@RequestMapping("/order")
public class OrderAPI {
// 注入本地的OrderService
@Autowired
private OrderService orderService;
// 库存服务的远程调用地址
private static final String URL_PREFIX =
"http://localhost:8002/inventory/minusInventory";
// 负责远程调用的RestTemplate
@Autowired
private RestTemplate restTemplate;
// 下单接口
@RequestMapping("/placeAnOrder")
public String placeAnOrder(){
// 随便指定一个商品的ID
String inventoryId = "92b1162a-eb7a-4d72-9645-dea3fe03c8e2";
// 然后通过HttpClient调用库存服务的减库存接口
String result = HttpClient.get(URL_PREFIX +
"/minusInventory?inventoryId=" + inventoryId);
System.out.println("\n调用减库存接口后的响应结果:" + result + "\n");
// 调用减库存接口成功后,向订单库中插入一笔订单记录
String orderId = UUID.randomUUID().toString();
Order order = new Order(orderId,"黄金竹子","8888.88",inventoryId);
Integer n = orderService.insertSelective(order);
System.out.println("\n\n\n" + n + "\n\n\n");
return "下单调用成功,需要处理事物.....";
}
}
// 订单服务本地的OrderService实现类
@Service
public class OrderServiceImpl implements OrderService {
// 新增订单会调用的插入方法
@Override
@Transactional
public Integer insertSelective(Order record) {
// 刻意制造出一个异常
int i = 100 / 0;
return orderMapper.insertSelective(record);;
}
}
要注意看,在orderService.insertSelective(order)
插入订单数据的方法中,我们通过100/0
手动制造了一个异常,以此来模拟出「扣减库存」执行成功、「新增订单」执行失败的场景,接着看看库存DB
、订单DB
中对应的库存表、订单表数据,如下:
很明显,目前订单表中还没有任何数据,而库存表中仅有一条测试数据,但要注意:这两张表分别位于db_inventory、db_order
两个不同的库中,此时「黄金竹子」的库存数量为100
,现在分别启动库存服务、订单服务来做简单模拟:
- 订单服务的下单接口:
http://localhost:8001/order/placeAnOrder
。
这里就直接用浏览器做测试,浏览器调用下单接口后,控制台的日志如下:
两个服务对应的数据库中的数据如下:
结果十分明显,此时对应商品的库存扣掉了,但由于新增订单时出现异常,所以订单却并未增加,最终造成了数据不一致问题,这也就是前面所说到的分布式事务问题,这也是分布式系统中,需要解决的一个棘手问题。
1.2、该如何解决分布式事务问题呢?
早年间分布式架构并不像如今这么主流,一般只有一些互联网大厂才会使用,因此相关的技术生态和解决方案,并不像那么成熟,而分布式事务问题,也成为了使用分布式架构不得不解决的棘手问题,在分布式事务问题被发现后,期间推出了各种各样的解决方案,但如今保留下来的主流方案共有四种:
- ①基于
Best Efforts 1PC
模式解决分布式事务问题。 - ②基于
XA
协议的2PC、3PC
模式做全局事务控制。 - ③基于
TTC
方案做事务补偿。 - ④基于
MQ
实现事务的最终一致性。
但上述四种仅是方法论,也就是一些虚无缥缈的理论,想要使用时还得根据其概念去自己落地,但如今分布式/微服务生态已经十分成熟,所以也有很多现成的落地技术,早已能够解决分布式事务问题,如Seata、GTS、LCN、 Atomikos、RocketMQ、Sharding-Sphere...
等框架,都提供了完善的分布式事务支持,目前较为主流的是引入Seata
框架解决,其内部提供了AT、TCC、XA、Seaga-XA
四种模式,主推的是AT
模式,使用起来也较为简单,大体步骤如下:
- ①引入相关的
Maven
依赖。 - ②修改相关的配置文件。
- ③在需要保障分布式事务的方法上加一个
@GlobalTransactional
注解。
经过上述三步后,就能够轻松解决早期困扰大厂多年的分布式事务问题,是不是尤为轻松?其他的分布式事务框架使用步骤也相差无几,引入依赖、修改配置、加一个注解即可。
二、手写分布式事务框架的思路分析
前面对分布式事务问题的描述讲了一大堆,但真正要解决对应问题时,似乎仅靠一个注解就够了,这这这......,到底是怎么解决的呢?相信诸多使用Seata
或其他分布式事务框架的小伙伴,心中难免都会产生这个疑惑。OK,咱们先假设现在没有这些成熟的分布式事务框架,如果自己要解决分布式事务问题,又该如何去实现呢?那么接下来就重点说说这块,真正让大家做到知其然并知其所以然!
不过做任何事情得先有规划,没有提前做好准备与计划,任何事一般都不会有太好的结果,因此先分析一下手写分布式事务框架的思路,把思路捋清楚之后,接着再去编写对应的代码实现。
前面讲的分布式事务问题,本质原因在于Spring
是依靠数据库自身的事务机制来保障数据一致性的,而两个服务对应着两个库,两个库各自都维护着自己的事务机制,同时无法感知对方的事务状况,最终造成库存服务先提交了事务,而订单服务后回滚事务的情况出现。
所以想要自己解决分布式事务问题,首先就不能依靠
MySQL
自身的事务机制来解决问题,对于事务的管理必须要是全局性质的,也就是需要引入一个第三方来进行全局事务管理,而管理事务的这个角色,我们将其称之为事务管理者。
既然需要把事务交给第三者管理,那么每个参与全局事务的子服务,其事务的控制权必须要拿到,也就是不允许任何一个参与者私自提交或回滚事务,事务的控制权完全交给事务管理者,一组全局事务的结果到底是提交,还是回滚,这点全权由事务管理者决定。
到这里出现了两个角色:事务管理者、事务参与者,所谓的事务参与者,即代表参加一个全局事务的子服务,如前面的下单业务中,库存服务和订单服务,就可以理解成是两个事务参与者。
因为事务参与者要把自己的执行状态告知给管理者,同时管理者需要把事务的最终处理通知给每个参与者,所以管理者、参与者之间要能相互通信,所以等会儿会采用Netty
网络框架,实现点对点对端通信。
但为了不影响业务,也就是减小代码的侵入性,对于事务参与者而言,改动的代码量越小越好,所以这里可以用
Maven
构建一个本地工程,在这个工程中提供一个全局事务注解,然后将该工程打成一个依赖包,其他需要使用分布式事务的子服务,直接在pom.xml
中引入该依赖即可,涉及到分布式事务的方法,直接在业务方法上面加上注解。
OK,到这里分析出了大体步骤,大体的逻辑图如下:
有三个核心步骤:
- ①实现事务管理者,能够协调全局事务的参与者,等整个业务调用链执行结束后决定事务处理方案。
- ②实现事务参与者,能够向事务管理者中注册/加入全局事务、告知执行结果、接收最终处理方案。
- ③构建
Maven
工程打成依赖包,实现自定义注解,尽量对业务代码做到低入侵。
但上述仅是大体思路,下面来开始逐步实现各个步骤,每个步骤中的细节会慢慢展开。
三、手写分布式事务框架实战
将实现的大概思路弄清楚后,接着先来实现一下事务管理者,实现通过Netty
搭建一个服务端,如果对于Netty
还不熟悉的小伙伴,后续我会更新Netty
框架的文章,这里可以套用Java-NIO
的概念去理解,代码如下:
// 事务管理者的启动类
public class Main {
public static void main(String[] args){
// 这个是自定义的一个服务端
NettyServer nettyServer = new NettyServer();
// 为其绑定IP和端口号
nettyServer.start("localhost", 8080);
System.out.println("\n>>>>>>事务管理者启动成功<<<<<\n");
}
}
// Netty服务端 - 事务管理者
public class NettyServer {
// 启动类
private ServerBootstrap bootstrap = new ServerBootstrap();
// NIO事件循环组
private NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
// 启动方法
public void start(String host, int port) {
try {
// 调用下面的初始化方法
init();
// 绑定端口和IP
bootstrap.bind(host, port).sync();
} catch (Exception e) {
e.printStackTrace();
}
}
// 初始化方法
private void init(){
bootstrap.group(nioEventLoopGroup)
.channel(NioServerSocketChannel.class)
// 添加一个自定义的处理器
.childHandler(new ServerInitializer());
}
// 关闭方法
public void close(){
nioEventLoopGroup.shutdownGracefully();
bootstrap.clone();
}
}
到这里相对来说还比较简单,就是创建了一个服务端,然后绑定了一个IP
和端口,最主要是上面NettyServer.init()
方法,在里面添加了一个自定义的处理器,该处理器代码如下:
// NIO的通道处理器
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 设置编码器、解码器、处理器
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new NettyServerHandler());
}
}
前面添加的是解码器和编码器,主要用于数据传输时的编解码工作,重点是要关注最后添加的这个处理器,这个处理器是自定义的,里面会编写处理分布式事务的核心逻辑,基础架构如下:
// 分布式事务的核心处理器
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private static ChannelGroup channelGroup =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
// 事务组中的事务状态列表
private static Map<String, List<String>> transactionTypeMap = new ConcurrentHashMap<>();
// 事务组是否已经接收到结束的标记
private static Map<String, Boolean> isEndMap = new ConcurrentHashMap<>();
// 事务组中应该有的事务个数
private static Map<String, Integer> transactionCountMap = new ConcurrentHashMap<>();
// 把整个Channel加入到channelGroup中
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.add(channel);
}
/***
* 这里是待会儿实现分布式事务的核心
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 等会儿逐步实现
}
// 向客户端(事务参与者)发送最终处理结果的方法
private void sendResult(JSONObject result){
System.out.println("事务最终处理结果:" + result.toJSONString());
for (Channel channel : channelGroup){
channel.writeAndFlush(result.toJSONString());
}
}
}
在这个处理器类中,主要是四个地方需要理解,即里面的三个容器,两个方法:
transactionTypeMap
:记录一个事务组中,所有子事务的执行状态。isEndMap
:记录一个事务组,当前所有的子事务是否已抵达。transactionCountMap
:记录一个事务组,应该由多少个子事务组成。handlerAdded()
:事务管理者的核心方法,所有来自客户端的消息都会被该方法监听到。sendResult()
:这个方法会在一组事务全部执行完成后,用来给参与者返回处理结果使用。
OK,这里其实并非涉及到任何全局事务的处理,只是基于Netty
搭建了一个服务端结构,后续会一点点去展开实现,这样也能够更便于诸位理解,接着再准备着手实现事务参与者。
3.1、事务参与者的实现过程
前面说过,所谓的事务参与者其实就是指每个业务子服务,所以按理来说,事务参与者的代码实现应该放到库存服务、订单服务中,但这样会对业务服务的造成较高的代码侵入性,同时每个需要使用分布式事务的子服务,都需要复制一遍业务参与者的代码,难免有些表现的“。不太智能”。
因此这里将事务参与者的核心代码,单独拧出来构建一个
Maven
子工程,实现完成后将其打成依赖包放到本地仓库,然后需要使用分布式事务功能的子服务,只需要引入Maven
依赖即可(我这里将其命名为zhuzi-distributed-tx
,即代表「竹子爱熊猫」开发的一款分布式事务框架,哈哈哈,臭美一下~)。
3.1.1、Netty客户端的实现
作为事务参与者,因为要和事务管理者之间进行通信,而事务管理者的本质是一个Netty-Server
服务端,所以这里的事务参与者,本质就是Netty-Client
客户端,所以Netty
客户端的代码实现如下:
// Netty-Client客户端代码实现
@Component
public class NettyClient implements InitializingBean {
// 这个是事务参与者的核心处理器
private NettyClientHandler client = null;
private static ExecutorService executorService = Executors.newCachedThreadPool();
@Override
public void afterPropertiesSet() throws Exception {
System.out.println("\n\n>>>>>>事务参与者启动成功<<<<<<\n\n");
start("localhost", 8080);
}
// 根据IP、端口地址,向服务端注册客户端
public void start(String host, int port) {
client = new NettyClientHandler();
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
// 这里添加了一个自定义的处理器
.handler(new ClientInitializer(client));
try {
bootstrap.connect(host, port).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void send(JSONObject sendData) {
try {
// 调用处理器中向服务端发送数据的方法
client.sendData(sendData);
} catch (Exception e) {
e.printStackTrace();
}
}
}
结构基本上和事务管理者类似,首先有这么一个启动类,会在业务服务启动时,顺势一起伴随启动,接着会根据地址找到事务管理者并注册,而事务参与者这里同样有一个自定义的处理器,代码如下:
// 自定义的处理器
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
private NettyClientHandler nettyClientHandler;
public ClientInitializer(NettyClientHandler nettyClientHandler) {
this.nettyClientHandler = nettyClientHandler;
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 这里初始化的是客户端的处理器
pipeline.addLast("handler", nettyClientHandler);
}
}
// 事务参与者的核心处理器
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private ChannelHandlerContext channelHandlerContext;
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
channelHandlerContext = ctx;
}
// 所有服务端(事务管理者)返回的数据都会被该方法监听到
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// 等会儿逐步实现
}
// 向服务端(事务管理者)发送数据的方法
public void sendData(JSONObject result){
System.out.println("向事物管理者发送数据:" + result.toJSONString());
channelHandlerContext.writeAndFlush(result.toJSONString());
}
}
在事务参与者的处理器中,同样存在两个方法:
channelRead()
:所有来自服务端(事务管理者)的消息都会被这个方法监听到。sendData()
:客户端(参与者)向服务端(管理者)发送数据的方法。
OK,看到这里相信有些对于Netty
不大熟悉的小伙伴会有些懵,那下面画一幅图来简单说明一下目前的结构,就算你不会Netty
框架也没关系,只要记住这幅图即可:
这里实则是基于Netty
搭建了一个客户端与服务端的通信架构,这样既能够让参与者向管理者注册事务,也能够让管理者向参与者返回结果,理解这个核心思想就OK
。
3.1.2、事务参与者的核心实现
搭建出管理者、参与者之间的基础通信架构后,接着来实现事务参与者的核心代码,先定义个枚举类:
// 事务类型的枚举类
public enum TransactionalType {
// 提交和回滚状态
commit, rollback;
}
这个枚举类会用来在后续判断事务状态,它会作为子事务对象的一个子属性,事务对象的定义如下:
// 分布式事务 - 子事务对象
public class ZhuziTx {
// 当前子事务属于哪个事务组
private String groupId;
// 当前子事务的事务ID
private String transactionalId;
// 当前子事务的事务类型
private TransactionalType transactionalType;
// 当前子事务的任务等待队列(基于此实现事务控制权)
private Task task;
public ZhuziTx(String groupId, String transactionalId, TransactionalType transactionalType) {
this.groupId = groupId;
this.transactionalId = transactionalId;
this.transactionalType = transactionalType;
this.task = new Task();
}
// 省略其他构造方法、以及get/set方法......
}
每个参与全局事务的子事务,在这里都会被封装为一个个ZhuziTx
对象,每个子事务具备下述几个属性:
groupId
:表示当前子事务属于那一组全局事务,因为同时可能存在多组分布式事务。transactionalId
:当前子事务的事务ID
值,具备全局唯一特性。transactionalType
:这个是前面的枚举类,表示当前子事务最终的执行状态(回滚/提交)。task
:这个目前暂时不会用到,后面用来实现事务提交或回滚的控制权。
了解这个基本的子事务对象后,接着再来看看事务参与者的核心实现类:
// 事务参与者的核心实现类
public class ZhuziTxParticipant {
// 获取前面伴随服务启动产生的NettyClient客户端
private static NettyClient nettyClient =
ApplicationContextProvider.getBean(com.zhuzi.distributedtx.netty.NettyClient.class);
// 存储当前线程在执行的子事务对象
private static ThreadLocal<ZhuziTx> current = new ThreadLocal<>();
// 存储当前子事务所属的事务组ID值
private static ThreadLocal<String> currentGroupId = new ThreadLocal<>();
// 存储当前子事务所属的事务组子事务总量
private static ThreadLocal<Integer> transactionalCount = new ThreadLocal<>();
// 事务ID和子事务对象的映射组
private static Map<String,ZhuziTx> ZHUZI_TRANSACTIONAL_MAP = new HashMap();
/**
* 向事务管理者中发送一个创建事务组的命令
* @return
*/
public static String createZhuziTransactionalManagerGroup(){
// 随机产生一个UUID作为事务组ID
String groupID = UUID.randomUUID().toString();
// 通过JSON做序列化
JSONObject sendData = new JSONObject();
// 传入前面产生的事务组ID,以及本次操作为create创建指令
sendData.put("groupId", groupID);
sendData.put("command", "create");
// 调用客户端的send()方法向服务端发送数据
nettyClient.send(sendData);
System.out.println(">>>>>向管理者发送创建事务组命令成功<<<<<");
// 把事务组ID存在currentGroupId当中
currentGroupId.set(groupID);
// 对外返回事务组ID值
return groupID;
}
/***
* 创建一个子事务对象
*/
public static ZhuziTx createTransactional(String groupId){
// 随机产生一个UUID作为子事务ID
String transactionalId = UUID.randomUUID().toString();
// 示例化出一个子事务对象
ZhuziTx zhuziTransactional = new ZhuziTx(groupId, transactionalId);
// 将创建出的子事务对象保存到相关的变量中
ZHUZI_TRANSACTIONAL_MAP.put(groupId, zhuziTransactional);
current.set(zhuziTransactional);
// 对事务组数量+1
Integer integer = addTransactionCount();
System.out.println("创建子事务,目前事务组长度为:" + integer);
return zhuziTransactional;
}
/**
* 注册事务(向事务管理者的事务组中添加子事务)
*/
public static ZhuziTx addZhuziTransactional(ZhuziTx ztp,
Boolean isEnd, TransactionalType type){
// 通过JSON序列化一个对象
JSONObject sendData = new JSONObject();
// 传入当前子事务的组ID、事务ID、事务类型、操作类型....信息
sendData.put("groupId", ztp.getGroupId());
sendData.put("transactionalId", ztp.getTransactionalId());
sendData.put("transactionalType", type);
sendData.put("command", "add");
sendData.put("isEnd", isEnd);
sendData.put("transactionalCount", ZhuziTxParticipant.getTransactionalCount());
// 将封装好的JSON发送给事务管理者
nettyClient.send(sendData);
System.out.println(">>>>>向管理者发送添加子事务命令成功<<<<<");
return ztp;
}
// 增加事务组数量的方法
public static Integer addTransactionCount() {
System.out.println(transactionalCount.get());
int i = (transactionalCount.get() == null
? 0 : transactionalCount.get()) + 1;
transactionalCount.set(i);
return i;
}
// 省略前面类成员的Get/Set方法.....
}
上述这个类即是事务参与者的核心处理类,里面主要提供了三个核心方法:
createZhuziTransactionalManagerGroup()
:向管理者中申请创建一个事务组。createTransactional()
:根据当前服务的事务情况,创建一个子事务对象。addZhuziTransactional()
:向管理者的指定事务组中添加一个子事务。
同时为了提供给多个服务使用,这个类中的成员基本上都采用ThreadLocal
来修饰,也就是每条执行不同业务的线程,都会拥有自己的current、currentGroupId、transactionalCount
这三个属性,这三个属性会用来辅助完成在管理者中创建事务组、添加子事务的工作。
3.1.3、接管参与者的事务控制权
经过上述的一些流程后,虽然构建出了一些基础组件,但这并不能阻止每个服务各自提交事务,默认情况下,MySQL
执行完一条SQL
语句后会立马提交事务,如若想让MySQL
不自动提交事务,则必须通过begin
之类的方式来手动管理事务。
之前这个工作交给
Spring
来管理,Spring
在业务操作未抛出异常的情况下,则会向MySQL
发送commit
指令,这里也不去完全重写Spring
的事务机制,因为这样会导致工作量尤为巨大,而是基于Spring
原有的事务机制基础上,剥夺掉Spring
主动提交、回滚事务的权限。
那究竟该如何剥夺掉Spring
主动提交、回滚事务的权限呢?这里需要先简单理解一下Spring
事务机制的原理。
之前提到过,
Spring
框架的事务机制,依旧是依赖于数据库自身所提供的事务机制来实现,这也就意味着当被Spring
的@Transactional
注解修饰的方法,执行结束后,Spring
会去调用JDBC
接口中的commit()/rollback()
方法,从而实现事务的提交或回滚。
简单理解上述原理后,那再来思考一下咱们该如何接管事务的控制权呢?局势就十分明朗了,既然Spring
是通过JDBC
接口中的方法,来完成事务提交或回滚的,那咱们只需要撰写一个AOP
切面,去拦截尝试调用对应接口提交/回滚事务的线程即可。
但是问题又又又来了,咱们将调用
JDBC
事务接口的线程拦截后,这还不够,因为这并不能让咱们接管事务的控制权,这仅仅只能剥夺掉Spring
主动提交、回滚事务的权限。但最终每个业务服务(事务参与者)在收到事务管理者最终的处理方案后,依旧需要提交或回滚事务,所以我们的zhuzi-distributed-tx
还需要彻底拿到事务控制权。
但这里该如何去拿到事务控制权呢?其实很简单,这里先写一下AOP
切面的实现,如下:
// 剥夺并接管Spring事务控制权的切面
@Aspect
@Component
public class ZhuziDataSourceAspect {
@Around("execution(* javax.sql.DataSource.getConnection(..))")
public Connection dataSourceAround(ProceedingJoinPoint proceedingJoinPoint)
throws Throwable {
System.out.println("事务切面成功拦截,正在接管控制权......");
// 如果当前调用事务接口的线程正在参与分布式事务,
// 则返回自定义的Connection对象接管事务控制权
if (ZhuziTxParticipant.getCurrent() != null){
System.out.println("返回自定义的Connection对象.......");
Connection connection = (Connection) proceedingJoinPoint.proceed();
return new ZhuziConnection(connection, ZhuziTxParticipant.getCurrent());
}
// 如果当前线程没有参与分布式事务,让其正常提交/回滚事务
System.out.println("返回JDBC的Connection对象.............");
return (Connection) proceedingJoinPoint.proceed();
}
}
这个切面的代码不多,主要是拦截了所有调用DataSource.getConnection()
的线程,然后会进行判断,如果当前线程执行的业务方法,正在参与分布式事务,则返回自定义的数据库连接对象,如果是未参与分布式事务的本地事务操作,则让其正常提交/回滚事务。
那这里为何要去拦截执行
DataSource.getConnection()
这个方法的线程呢?因为Spring
在提交事务时,会先调用该方法获取数据库连接对象,然后通过数据库连接对象中的rollback/commit
方法完成事务回滚或提交,因此可以以该方法作为切入点,从而接管Spring
的事务控制权。
但具体如何接管的呢?核心实现则位于ZhuziConnection
这个自定义的数据库连接类中,如下:
// 自定义的数据库连接类(必须要实现JDBC的Connection接口)
public class ZhuziConnection implements Connection {
// 原本应该返回的数据库连接对象
private Connection connection;
// 存放参与分布式事务的子事务
private ZhuziTx zhuziTx;
// 负责提交事务的线程
private ExecutorService commitT = Executors.newSingleThreadExecutor();
// 负责回滚事务的线程
private ExecutorService rollbackT = Executors.newSingleThreadExecutor();
public ZhuziConnection(Connection connection, ZhuziTx zhuziTx) {
this.connection = connection;
this.zhuziTx = zhuziTx;
}
@Override
public void commit() throws SQLException {
// 交给线程池中的线程来做最终的事务提交
commitT.execute(() -> {
try {
// 阻塞线程,禁止提交
zhuziTx.getTask().waitTask();
// 如果管理者返回事务可以提交,则提交事务
if (zhuziTx.getTransactionalType().equals(TransactionalType.commit)) {
System.out.println("\n收到管理者最终决断:提交事务中\n");
connection.commit();
System.out.println("\n子事务提交事务成功...\n");
}
// 否则调用rollback()方法回滚事务
else {
System.out.println("\n收到管理者最终决断:回滚事务中...\n");
connection.rollback();
System.out.println("\n子事务回滚事务成功...\n");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
});
}
@Override
public void rollback() throws SQLException {
// 交给线程池中的线程来做最终的事务回滚
rollbackT.execute(() -> {
zhuziTx.getTask().waitTask();
try {
connection.rollback();
System.out.println("\n\n子事务回滚事务成功...\n\n");
} catch (SQLException e) {
e.printStackTrace();
} finally {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
});
}
@Override
public void close() throws SQLException {
connection.close();
}
// 省略其他Connection接口需要实现的方法......
}
这个自定义的数据库连接类,必须要实现JDBC
中的Connection
接口,毕竟当线程在尝试调用DataSource.getConnection()
获取连接时,通过切面返回一个并非Connection
类型的对象回去,这自然会导致程序报错,同时咱们依旧需要通过原本的连接对象,实现事务最终的提交或回滚,因此也要将原本应该返回的Connection
对象注入到自定义的连接类中。
对于接口
Connection
中的其他方法,直接调用原本连接对象的方法来执行即可,咱们想要接管事务的控制权,唯一需要重写的就是rollback()/commit()
这两个方法,接着来看看重写后的两个方法。
这里先将代码截出来便于理解,但想要理解这两段代码,需要具备一定程度的多线程编程基础,否则会十分困惑,首先要记住:无论执行哪个方法,都会有两条线程而并非一条!当一条线程执行完业务方法后,接着会根据Spring
的事务机制,来获取连接对象调用commit/rollback
方法结束事务,而这里咱们将commit/rollback
方法改成了向线程池中提交一个任务,那也就意味着:业务线程向对应的线程池提交方法后,会立即返回,这里业务线程并不会被阻塞。
而这里不阻塞,阻塞的是什么呢?其实被阻塞住的是线程池中的线程,这些线程会阻塞至事务管理者返回最终决断时,才会继续往下执行(后续代码中会唤醒这些线程),这样就做到了即不阻塞业务线程,又没有真正提交/回滚事务,从而真正的拿到了事务控制权,到底啥时候提交/回滚事务,这完全可以由咱们自己决定。
但注意:阻塞线程的方法是调用了子事务对象中的Task.waitTask()
方法,还记得咱们之前定义的子事务对象嘛?其中有一个这样的成员:
这个成员的实现类如下:
// 子事务的等待队列:基于此实现事务控制权
public class Task {
// 通过ReentrantLock的Condition条件等待队列实现线程阻塞/唤醒
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
// 阻塞挂起线程的方法
public void waitTask(){
System.out.println("事务控制权已经被拦截挂起........");
lock.lock();
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
// 唤醒放下线程的方法
public void signalTask(){
System.out.println("事务控制权已经被拦截放下........");
lock.lock();
condition.signal();
lock.unlock();
}
}
这个Task
类的实现尤为简单,内部通过ReentrantLock
的Condition
多条件等待队列,实现waitTask()
阻塞线程、signalTask
唤醒线程这两个方法,当Spring
尝试提交/回滚参与者的事务时,由于使用的是咱们自定义的连接对象,因此调用commit/rollback
方法后并不会真正结束事务,而是会把提交/回滚事务的工作交给线程池完成。
当线程池收到业务线程提交的任务后,会首先挂起自身线程,等待后续出现唤醒指令时,才会真正的执行
commit/rollback
操作。
那这里的线程被阻塞后,到底什么时候会唤醒呢?也就是多久才会真正的提交/回滚事务呢?这里需要回到之前的NettyClientHandler.channelRead()
方法,如下:
// 当事务管理者返回最终决断时,该方法会被触发,进而会执行这个方法的代码
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("接收到事务管理者的最终决断:" + msg.toString());
// 反序列化解析JSON数据
JSONObject data = JSON.parseObject((String) msg);
String groupId = data.getString("groupId");
String command = data.getString("command");
System.out.println("接收command:" + command);
// 对事务进行操作
ZhuziTx zhuziTx = ZhuziTxParticipant.getZhuziTransactional(groupId);
// 如果事务管理者最终决定提交事务
if ("commit".equals(command)){
// 根据groupID找到子事务并设置commit状态
zhuziTx.setTransactionalType(TransactionalType.commit);
}
// 如果事务管理者最终决定回滚事务
else{
// 根据groupID找到子事务并设置rollback回滚状态
zhuziTx.setTransactionalType(TransactionalType.rollback);
}
// 唤醒在之前阻塞的、负责提交/回滚事务的线程
zhuziTx.getTask().signalTask();
}
当参与者收到管理者的最终通知后,根据事务管理者的最终决断来设置事务状态,然后再唤醒前面阻塞的线程,真正执行提交或回滚事务的操作:
- 如果管理者的通知为
commit
,这里会将子事务的状态设为TransactionalType.commit
。 - 否则这里会将子事务的状态设为
TransactionalType.rollback
。
channelRead()
方法被触发后,最终会调用signalTask()
唤醒前面阻塞的线程,前面阻塞的线程被唤醒后,会接着执行if (zhuziTx.getTransactionalType().equals(TransactionalType.commit))
这行代码,也就是判断管理者最终给出的决断是否为commit
,如果是则提交当前子事务,否则会调用connection.rollback()
方法回滚当前子事务。
看到这里,对于事务最终是如何提交或回滚的,相信大家已经明白了其中原理,但一通代码看下来大家估计有些绕,所以接着上一个流程图,帮大家总结一下这个过程,如下:
3.1.4、自定义分布式事务注解
为了保障对业务代码的零侵入性,这里使用自定义注解来实现参与者嵌入业务服务的功能,当其他子服务需要使用时,只需要在对应的方法上加上一个注解即可,自定义注解如下:
// 自定义的分布式事务注解
@Target({
ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface ZhuziTransactional {
// 标识当前是全局事务的开启者
boolean isStart() default false;
// 标识当前是全局事务的结束者
boolean isEnd() default false;
}
这个注解中有两个值可选,isStart=true
表示当前被该注解修饰的方法,是一个分布式事务中的第一个业务操作,isEnd=true
则代表是最后一个业务操作,但光定义注解是没有意义,接着还需要通过AOP
切面来拦截使用该注解的方法,切面如下:
// 负责拦截自定义注解的切面
@Aspect
@Component
public class ZhuziTransactionalAspect implements Ordered {
@Around("@annotation(com.zhuzi.distributedtx.annotation.ZhuziTransactional)")
public Integer invoke(ProceedingJoinPoint proceedingJoinPoint){
System.out.println("分布式事务注解生效,切面成功拦截............");
// 获取对应注解的业务方法,以及方法上的注解对象
MethodSignature signature = (MethodSignature) proceedingJoinPoint.getSignature();
Method method = signature.getMethod();
ZhuziTransactional zta = method.getAnnotation(ZhuziTransactional.class);
// 创建事务组
String groupId = "";
// 如果目前触发切面的方法,是一组全局事务的第一个子事务
if (zta.isStart()){
// 则向事务管理者注册一个事务组
groupId = ZhuziTxParticipant.createZhuziTransactionalManagerGroup();
}
// 否则获取当前事务所属的事务组ID
else {
groupId = ZhuziTxParticipant.getCurrentGroupId();
}
// 创建子事务
ZhuziTx zhuziTx = ZhuziTxParticipant.createTransactional(groupId);
// spring会开启MySQL事务
try {
//执行spring切面(dataSource切面),执行具体的业务方法
Object result = proceedingJoinPoint.proceed();
// 没有抛出异常证明该事务可以提交,把子事务添加进事务组
ZhuziTxParticipant.addZhuziTransactional(zhuziTx, zta.isEnd(),
TransactionalType.commit);
// 返回执行成功的结果
return (Integer) result;
} catch (Exception e){
e.printStackTrace();
// 抛出异常证明该事务需要回滚,把子事务添加进事务组
ZhuziTxParticipant.addZhuziTransactional(zhuziTx, zta.isEnd(),
TransactionalType.rollback);
} catch (Throwable throwable) {
throwable.printStackTrace();
// 把子事务添加进事务组,抛出异常证明该事务需要回滚
ZhuziTxParticipant.addZhuziTransactional(zhuziTx, zta.isEnd(),
TransactionalType.rollback);
// 返回执行失败的结果
return -1;
}
return -1;
}
// 设置优先级,让前面拦截事务的切面先执行
@Override
public int getOrder() {
return 10000;
}
}
这个切面代码并不多,但逻辑相对来说也并不简单,整个方法执行的核心逻辑如下:
- ①通过反射机制获取自定义注解修饰的
Method
方法对象,以及注解对象自身。 - ②判断业务方法上注解的值,看看是
isStart
是否为True
:- 为
true
,表示触发切面的业务方法,是分布式事务中的第一个业务操作,所以会先向管理者申请创建一个事务组,并获取事务组ID
。 - 不为
true
,判断当前子事务应该属于哪个事务组,获取事务组ID
。
- 为
- ③通过前面拿到的事务组
ID
,调用createTransactional()
方法实例化一个子事务对象。 - ④通过
AOP
中的proceedingJoinPoint.proceed()
方法,执行切面拦截的具体业务操作。 - ⑤如果业务操作执行过程中没有抛出异常,则向管理者的事务组中添加一个
commit
状态的子事务。 - ⑥如果业务操作执行过程中抛出异常,则向管理者的事务组中添加一个
rollback
状态的子事务。
上述便是整个AOP
切面的核心工作,一句话总结就是:会根据当前子事务的执行状态,向事务管理者的事务组中添加一个子事务。但为了防止这个切面的优先级高过前面的切面,因此也需要重写一下getOrder()
方法,将当前切面的优先级放的低一些,让拦截Spring
事务的切面先执行。
3.1.5、事务组ID是如何在上下游服务中传递的?
不过在这个切面中,有一个细节,即当前子事务是怎么知道自己是属于哪个事务组的呢?在代码中使用了ZhuziTxParticipant.getCurrentGroupId()
获取了当前子事务的组ID
,这个GroupId
是如何传递过来的呢?目前的调用情况如下:
- ①客户端调用订单服务的下单接口时,订单服务会先调用库存服务的减库存接口。
- ②库存服务中开启一个分布式事务,生成一个全局唯一
ID
,并创建一个事务组。 - ③库存服务根据前面生成得到的组
ID
,把自身事务的执行状态,加入到管理者的事务组中。 - ④库存服务向订单服务返回调用结果,即
OK/200
,以及调用减库存接口成功的信息。 - ⑤订单服务收到调用结果后,继续调用本地的新增订单方法,执行完成后添加执行状态到管理者。
这个过程听着似乎不是特别难对吧?但问题就在于④、⑤之间,订单服务中的「新增订单」执行完成后,又如何知道自己是属于哪个分布式事务组的呢?所以这里需要把库存服务中,生成的groupId
传递过来,这样才能确保两个子事务,会添加到同一个事务组里面。
但这个需求听起来简单,但实现起来却并不容易,无论是通过拦截器、亦或是过滤器,都无法实现这个需求,因为在库存服务执行完成后,响应报文就已经生成,所以在拦截器、过滤器中新增响应头信息,这是无法生效的。
那最终我是如何处理的呢?简单翻阅源码后,这里用到了Spring
框架预留的一个钩子接口:ResponseBodyAdvice
,实现这个接口的钩子类,会在Controller
方法执行完成之后,响应报文组装之前被调用,因此咱们可以在这里织入事务组ID
,实现如下:
// Spring框架预留的钩子接口:织入事务组ID
@ControllerAdvice
public class GroupIdRespAdvice implements ResponseBodyAdvice {
// 钩子类的前置方法:必须为true才会执行beforeBodyWrite()方法
@Override
public boolean supports(MethodParameter methodParameter, Class aClass) {
return true;
}
// Controller方法执行完成之后,响应报文组装之前执行
@Override
public Object beforeBodyWrite(Object body, MethodParameter methodParameter,
MediaType mediaType, Class aClass,
ServerHttpRequest request,
ServerHttpResponse response) {
// 如果ThreadLocal中的事务组ID不为空,代表当前请求参与了分布式事务,
// 会获取对应的事务组ID放入到响应头中(对于普通请求不会改写响应头)
if (ZhuziTxParticipant.getCurrentGroupId() != null){
// 把需要传递的事务组ID、子事务数量放入响应头中
response.getHeaders().set("groupId",
ZhuziTxParticipant.getCurrentGroupId());
response.getHeaders().set("transactionalCount",
String.valueOf(ZhuziTxParticipant.getTransactionCount()));
}
return body;
}
}
这样处理之后,就可以在上游服务的请求出口,为每个涉及分布式事务的请求添加上一个响应头信息,在响应头中会传输下游服务所需的事务组ID
、组中子事务数量信息,接着还需要在下游服务的响应入口,获取这些请求头信息,实现如下:
// HttpClient远程调用工具
public class HttpClient {
// GET请求的方法
public static String get(String url) {
String result = "";
try {
// 创建一个httpClient对象,并调用传入的URL接口
CloseableHttpClient httpClient = HttpClients.createDefault();
HttpGet httpGet = new HttpGet(url);
CloseableHttpResponse response = httpClient.execute(httpGet);
// 如果调用结果是返回OK,状态码为200
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
// 获取response对象中的所有响应头
Header[] allHeaders = response.getAllHeaders();
for (Header header : allHeaders) {
// 从中找到上游服务传递的组ID、事务数量,并赋值给自己的子事务
if ("groupId".equals(header.getName())){
String groupId = header.getValue();
ZhuziTxParticipant.setCurrentGroupId(groupId);
}
if ("transactionalCount".equals(header.getName())){
String transactionalCount = header.getValue();
ZhuziTxParticipant.setTransactionCount(
Integer.valueOf(transactionalCount == null ?
"0" : transactionalCount));
}
}
// 向调用方返回上游服务最终的调用结果
result = EntityUtils.toString(response.getEntity(), "utf-8");
}
response.close();
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
}
因为我这里是通过HttpClient
来实现远程调用的,所以我只需要在调用结束后,读取response
对象的请求头信息,然后获取其中的事务组ID
,并保存到自己的ThreadLocal
中即可。
但如果是用了
Dubbo、gRPC、Fegin、RestTemplate....
等远程调用的方式,大家可自行根据RPC
工具的类型,去编写Filter
过滤器截断响应结果,然后获取响应头中的数据,接着放入自己的ThreadLocal
中即可。
这个地方的本质实现就是分布式系统中,按调用链路去依次传递一个全局共享数据,在上游服务的出口写入响应头信息、下游服务的入口获取响应头信息即可。
我这里的
groupId、transactionalCount
都是放入到ZhuziTxParticipant
的ThreadLocal<String> currentGroupId
成员中,因为这里使用了ThreadLocal
来存储,所以多个分布式事务一起执行的情况,依旧不会冲突。
3.1.6、事务参与者的收尾工作
到这里,事务参与者就完整的实现出来了,但为了提供给其他业务子服务使用,因此咱们还需要执行一下mvn -package
命令,将当前实现好的zhuzi-distributed-tx
框架,打包到Maven
本地仓库中,后续其他子服务可通过下述GAV
坐标导入依赖:
<dependency>
<groupId>com.zhuzi</groupId>
<artifactId>zhuzi-distributed-tx</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
3.2、事务管理者的核心实现
在前面的过程中,咱们只基于Netty
构建出了一个最基本的服务端,但对于事务管理者的核心逻辑还并未开始实现,因此现在开始撰写管理者的核心实现,也就是回到NettyServerHandler.channelRead()
方法中,实现核心的逻辑,代码如下:
/***
*
* 创建事务组,并且添加保存事务
* 并且需要判断,如果所有事务都已经执行了(有结果了,要么提交,要么回滚)
* 如果其中有一个事务需要回滚,那么通知所有客户进行回滚,否则则通知所有客户端进行提交
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("接受数据:" + msg.toString());
JSONObject jsonObject = JSON.parseObject((String)msg);
// create:创建一个事务组,add:添加事务
String command = jsonObject.getString("command");
// 事务组ID
String groupId = jsonObject.getString("groupId");
// 子事务类型(commit:待提交、rollback:待回滚)
String transactionType = jsonObject.getString("transactionalType");
// 事务数量(当前这个全局事务的总参与者数量)
Integer transactionCount = jsonObject.getInteger("transactionalCount");
// 是否结束事务(是否为最后一个事务)
Boolean isEnd = jsonObject.getBoolean("isEnd");
// 如果参与者发来的是create指令,则创建一个事务组
if ("create".equals(command)){
transactionTypeMap.put(groupId, new ArrayList<String>());
}
// 如果参与者是add操作,则将对应子事务加入事务组
else if ("add".equals(command)){
transactionTypeMap.get(groupId).add(transactionType);
// 判断当前子事务是否为整组最后一个事务
if (isEnd) {
// 是则声明本组事务已结束
isEndMap.put(groupId, true);
transactionCountMap.put(groupId, transactionCount);
} else {
// 否则声明后续依旧会有事务到来
isEndMap.put(groupId, false);
transactionCountMap.put(groupId, transactionCount);
}
// 调试时的输出信息
System.out.println("isEndMap长度:" + isEndMap.size());
System.out.println("transactionCountMap长度:" + transactionCountMap.get(groupId));
System.out.println("transactionTypeMap长度:" + transactionTypeMap.get(groupId).size());
JSONObject result = new JSONObject();
result.put("groupId",groupId);
// 如果已经接收到结束事务的标记,则判断事务是否已经全部到达
if (isEndMap.get(groupId) &&
transactionCountMap.get(groupId)
.equals(transactionTypeMap.get(groupId).size())){
// 如果已经全部到达则看是否需要回滚
if (transactionTypeMap.get(groupId).contains("rollback")){
System.out.println("事务最终回滚..........");
result.put("command","rollback");
sendResult(result);
// 如果一组事务中没有任何事务需要回滚,则提交整组事务
} else {
System.out.println("事务最终提交..........");
result.put("command","commit");
sendResult(result);
}
}
}
}
之前聊到过,客户端(参与者)所有发送给服务端(管理者)的数据,都会被这个channelRead()
方法监听到,也就是每当有客户端给服务端发送数据时,都会触发这个方法执行,因此咱们只需在这个方法中实现核心逻辑即可,代码逻辑如下:
- ①通过
JSON
反序列化,解析客户端(参与者)发送过来的数据包。 - ②如果参与者数据包的
command=create
,则先创建一个事务组。 - ③如果参与者数据包的
command=add
,则将对应子事务的执行状态添加进事务组。 - ④将子事务添加进事务组后,接着判断一下
isEnd
是否为true
:- 否:继续等待其他子事务的到来。
- 是:进入第⑤步,对一个分布式事务进行最终处理。
- ⑤判断整组事务中是否包含
rollback
,只要有一个子事务的状态为rollback
,整组事务都需要回滚,反之则提交。 - ⑥最后构建一个
JSON
数据包,并调用sendResult()
方法,把管理者的最终决断通知给每个参与者。
上面是整个事务管理者的核心逻辑,简单来说其实就两个功能:
- 根据参与者数据包中的
command
指令,来创建事务组或添加子事务。 - 在一组事务全部已到达后,判断整组事务最终到底要回滚还是提交。
相较于事务参与者的实现来说,事务管理者的代码还比较简单,接着来做个简单的测试。
3.3、测试自定义的分布式事务框架
前面实现了事务参与者和事务管理者的核心功能后,接着在对应的业务服务中引入zhuzi-distributed-tx
框架的依赖:
<dependency>
<groupId>com.zhuzi</groupId>
<artifactId>zhuzi-distributed-tx</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
然后先在库存服务service
层的扣减库存方法上,加上一个@ZhuziTransactional
注解:
@Override
@Transactional
// 这里要写上isStart = true,因为这是第一个业务操作
@ZhuziTransactional(isStart = true)
public Integer updateByPrimaryKeySelective(Inventory record) {
int i = inventoryMapper.updateByPrimaryKeySelective(record);
return i;
}
接着在订单服务service
层的新增订单方法上,要同样加一个@ZhuziTransactional
注解:
@Override
@Transactional
// 这里要写上isEnd = true,因为这是最后一个业务操作
@ZhuziTransactional(isEnd = true)
public Integer insertSelective(Order record) {
// 刻意抛出一个异常
int i = 100 / 0;
int n = orderMapper.insertSelective(record);
System.out.println("\n\n\n" + n + "\n\n\n");
return n;
}
接着分别启动事务管理者、注册中心、库存服务、订单服务四个进程,然后开始测试,依旧通过浏览器调用之前的下单接口:
http://localhost:8001/order/placeAnOrder
此时重点观察控制台的日志输出,来看看结果,事务管理者的控制台输出如下:
事务参与者-库存服务的控制台输出如下:
事务参与者-订单服务的控制台输出如下:
上述三个日志输出中,重点观察事务管理者的输出,整个流程如下:
当事务管理者完成图中第七步后,接着事务参与者(业务服务)这边会收到来自管理者的通知,各自把自己子事务(「扣减库存、新增订单」)回滚,最后来看看数据库的表数据:
最终会发现,在两个数据库中,数据依旧没有发生变化,库存表中的数据依旧是99
,而订单表中也没有新增订单数据,最终做到了数据的完全一致性,从而解决了分布式事务造成的数据不一致问题。
四、分布式事务手写/原理篇总结
经过前面三个阶段的阐述后,咱们一点点的从分布式事务问题引出、演示,再到逐步去推敲手写分布式事务框架的思路,再慢慢的手写出了所有代码,最终成功解决了分布式事务问题,这个过程相对来说也并不轻松,尤其是一些底子较弱的小伙伴,阅读起来可能存在很大压力,所以在最后再完整总结一下:
- ①客户端调用订单服务的下单接口时,订单服务会先调用库存服务的减库存接口。
- ②库存服务中开启一个分布式事务,生成一个全局唯一
ID
,并创建一个事务组。 - ③库存服务根据前面生成得到的组
ID
,把自身事务的执行状态(commit
),加入到管理者的事务组中。 - ④库存服务向订单服务返回调用成功,并且通过
Spring
钩子类,将groupId
放到响应头中。 - ⑤订单服务收到调用结果后,从响应头中拿到事务组
ID
、子事务数量,放到自身的ThreadLocal
中。 - ⑥订单服务继续调用本地的新增订单方法,但由于咱们手动制造了异常,所以执行会报错,最终会根据前面的
groupId
,在管理者的事务组中添加一条执行状态为rollback
的子事务。 - ⑦事务管理者发现「下单」这个分布式事务的所有子事务全部抵达后,接着会进行最终审判,发现其中存在一个
rollback
,然后通知对应的所有事务参与者回滚。 - ⑧库存、订单服务收到事务管理者的最终审判后,最终回滚各自的所有业务操作,确保数据的完全一致性。
咱们写的整个分布式事务框架,其核心处理流程如上,而LCN、Seata-AT
模式的执行流程也大致如此,都是基于数据库的事务机制来实现的,但实际上会比咱们这个更加复杂很多倍,会牵扯到资源管理者、全局锁等概念。
同时真正的分布式事务框架中,都只会有一个分布式事务注解,生成全局事务ID
的操作,会放到最开始完成,然后向下进行传递,默认是最后一个子事务来结束整组事务操作,伪逻辑如下:
@GlobalTransactional // ①会在这里先生成全局事务ID
public String placeAnOrder(String shopID){
RPC.减库存接口(); // ②传递全局ID,减库存执行完成后,会根据全局ID添加一个子事务
Local.新增订单方法(); // ③传递全局ID,新增订单执行后,再向事务组添加一个子事务
// ④因为后面没有其他操作了,默认会结束这组分布式事务,进行最终的提交/回滚操作
}
但咱们设计的这款分布式事务框架,则设计出了两个分布式事务注解,用isStrart
来开启分布式事务 isEnd
来结束事务,这里主要是能让大家更便于理解分布式事务的核心原理,不过这种做法并不完善。
但我们的目的并不是打造一款商用框架,而是摒弃繁枝末节,真正理解分布式事务框架的核心原理,因此我就不继续去完善
zhuzi-distributed-tx
这个“分布式事务框架”啦~,大家感兴趣的可自行Down
下源码,这里我附上GitHub
的源码地址:>>>>戳我访问<<<<。
源码中涵盖了整个业务系统和分布式事务框架的完整实现,但为了快速搭建,所以对于微服务项目的架构并不全面,如RPC
框架用的HttpClient
,GateWay
网关也没有,限流熔断也没做,注册中心依旧用的是Eureka....
,还是那句话,大家有兴趣可自行完善~