(二十七)舞动手指速写一个Seata-XA框架解决棘手的分布式事务问题

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 相信大家对于事务问题都不陌生,在之前《MySQL事务篇》中曾详解过MySQL的事务机制,在传统的单库环境下开发,咱们可依赖于MySQL所提供的事务机制,来确保单个事务内的一组操作,要么全部执行成功,要么全部执行失败。

引言

相信大家对于事务问题都不陌生,在之前《MySQL事务篇》中曾详解过MySQL的事务机制,在传统的单库环境下开发,咱们可依赖于MySQL所提供的事务机制,来确保单个事务内的一组操作,要么全部执行成功,要么全部执行失败。

例如一个下单业务中,假设由「扣减库存、新增订单」两个操作组成,在单库中通过MySQL提供的事务机制,能够确保该事务中任意操作执行出现问题时,另一个操作变更的数据可以回滚,从而确保整库数据的一致性,避免产生库存扣了,但订单却未增加的情况出现。

在传统的单体架构中做单库开发,数据的一致性可以通过InnoDB事务机制来保障,但当项目换到分布式架构的环境时,或者当项目换到分库分表的环境时,答案亦是如此吗?并非如此,在分布式环境下,由于每个库都维护着自己的事务机制,相互之间无法感知对方的事务,因此就会出现分布式事务问题,这也是分布式系统中头疼多年的一个棘手问题!

本章的核心则是讲清楚分布式事务问题,以及该如何去解决这种棘手问题,但实际目前对于分布式事务的解决方案已经十分成熟了,即Spring Cloud Alibaba中的Seata框架,以及更早期的GTS、LCN、 Atomikos、RocketMQ、Sharding-Sphere...等框架都能够很好的解决分布式事务问题。

也正由于分布式事务问题的解决方案已经比较完善,基本上一个注解、几行代码、几行配置的事情,就能够轻松解决分布式事务问题,因此本章并非单纯去讲述这些框架的基本使用,而是从另一个角度来思考分布式事务问题,即假设没有这些成熟的解决问题,咱们遇到这个问题时又该如何处理呢?接下来会与大家一起,手把手的自己编写一个分布式事务框架。

因为涉及到了分布式事务框架的手写,可能内容会比较偏向于底层原理的分享,我会尽量在把这些内容写的简单一点,同时对于每个核心段落也会画图示例,但本身这个题材就比较硬核,因此想要彻底读懂这章,最好具备如下基础:

  • 分布式知识储备:主要是指SpringCloud微服务与RPC远程调用的基本使用。
  • 网络知识储备:主要是指Netty框架的使用、序列化知识、P2P网络通信的原理。
  • Spring相关的知识储备:主要是Transactional事务机制原理、AOP切面使用。
  • Java-JUC并发编程的知识储备:主要是ThreadLocal、Condition、Lock、线程池技术。
  • 其他的知识储备:主要是指自定义注解式开发、Maven打包技术、MySQL事务原理。

如若大家不具备上述基础,实则也无需担心,通篇读下来应该大致原理也能够弄懂,本章要做的是让诸位知其然并知其所以然,对技术不要停留在单纯的使用层面,而应该适当性的去参悟底层的实现原理,这才是大家与其他开发者拉开差距的核心竞争力。

因为个人还并未阅读过Seata框架的源码,因此本章是之前在阅读LCN这个老牌分布式事务框架仿写的,所以很多实现是借鉴于LCN的部分实现,但LCNSeata-AT模式大致相同,因此诸位也可将本篇当做Seata-AT模式的原理篇来阅读,在本章末尾也会提供源码实现。最后,如若你对于手写框架系列的内容感兴趣,那也可以看看之前曾发布过的《手写SpringMVC框架》这篇文章。

一、何谓分布式事务问题?

首先来聊聊啥是分布式事务问题,因为现在的分布式/微服务系统开发中,基本上每个核心服务都会具备自己的独享库,也就是垂直分库的模式,以前面的例子来说,订单服务有订单DB,库存服务有库存DB,每个服务之间的数据库都是独立的。此时先回顾原本单库环境中解决事务问题的方式,如下:

// 下单服务
@Transactional
public void placeAnOrder(){
   
   
    // 调用扣减库存的方法
    inventoryService.minusInventory();
    // 调用增加订单的方法
    orderService.insertOrder();
}

一个下单业务的伪代码如上,会先调用「扣减库存」的方法,接着再调用「新增订单」的方法,为了确保下单这组操作的数据一致性,通常会在方法上加一个@Transactional注解,这样就会将事务托管给Spring来负责,如下:

  • 在该方法执行时,Spring会首先向数据库发送一条begin开启事务的命令。
  • 如果执行过程中出现异常,Spring会向数据库发送一条rollback回滚事务的命令。
  • 如果执行一切正常,Spring会向数据库发送一条commit提交事务的命令。

Spring注解式事务的逻辑图如下:

001.png

这种事务管理机制,在单体架构中显然十分好用,但放到分布式环境中,情况则不同,如下:

002.png

由于分布式系统都会根据业务去拆分子系统/子服务,因此不同业务之间只能通过RPC的方式,远程调用对方所提供的API接口,假设这里在库存服务本地的「扣减库存」方法上加一个@Transactional注解,同时在订单服务本地的「新增订单」方法也加一个@Transactional注解,Spring内部的处理逻辑如下:

  • 下单业务远程调用「减库存」接口时,Spring会先向库存DB发送一个begin命令开启事务。
  • 当扣减库存的业务执行完成后,Spring会直接向库存DB发送一个commit命令提交事务。
  • 下单业务调用本地的「新增订单」方法时,Spring又会向订单DB发送begin命令开启事务。
  • 当新增订单执行出现异常时,Spring会向订单DB发送一个rollback命令回滚事务。

此时分析如上场景,下单业务理论上应该属于同一个事务,但之前《MySQL事务篇》聊到过,InnoDB的事务机制是基于Undo-log日志实现的,那么减库存产生的回滚记录会记录到库存DBUndo-log中,而新增订单产生的回滚记录则会记录到订单DBUndo-log中,此时由于服务不同、库不同,因此相互之间无法感知到对方的事务。

当后续「新增订单」的操作执行出现异常,Spring框架发送的rollback命令,就只能根据订单DB中的回滚记录去还原数据,此时前面扣减过的库存数据就无法回滚,因此导致了整体数据出现了不一致性。

1.1、分布式事务问题演示

前面简单讲述了分布式事务问题,但这样讲起来似乎有些令人费脑,那接下来直接上个案例,实际感受一下分布式事务造成的数据不一致问题,这里基于SpringCloud快速搭建了一个微服务项目,为了节省篇幅就不带着诸位一起走简单的搭建流程了,完整的源码地址会在最后给出,其中有订单、库存两个子服务,库存服务提供了一个减库存的接口,如下:

@RestController
@RequestMapping("/inventory")
public class InventoryAPI {
   
   

    // 注入本地的InventoryService
    @Autowired
    private InventoryService inventoryService;

    @RequestMapping("/minusInventory")
    public String minusInventory(Inventory inventory) {
   
   
        // 根据传入的商品ID先查询库存
        Inventory inventoryResult =
            inventoryService.selectByPrimaryKey(inventory.getInventoryId());

        // 如果库存不足则返回相应提示
        if (inventoryResult.getShopCount() <= 0) {
   
   
            return "库存不足,请联系卖家....";
        }

        // 如果商品还有剩余库存则对库存减一,接着修改数据库中的库存数量
        inventoryResult.setShopCount(inventoryResult.getShopCount() - 1);
        int n = inventoryService.updateByPrimaryKeySelective(inventoryResult);
        System.out.println("库存信息:" + inventoryResult.toString());

        // 扣减库存成功后,向客户端返回对应的提示
        if (n > 0) {
   
   
            return "端口:" + port + ",库存扣减成功!!!";
        }
        return "端口:" + port + ",库存扣减失败!!!";
    }
}

// 库存服务本地的InventoryService实现类
@Service
public class InventoryServiceImpl implements InventoryService {
   
   

    // 减库存会调用的修改方法,在上面添加了@Transactional注解
    @Override
    @Transactional
    public Integer updateByPrimaryKeySelective(Inventory record) {
   
   
        int i = inventoryMapper.updateByPrimaryKeySelective(record);
        return i;
    }
}

而订单服务中提供了一个下单接口,如下:

@RestController
@RequestMapping("/order")
public class OrderAPI {
   
   

    // 注入本地的OrderService
    @Autowired
    private OrderService orderService;

    // 库存服务的远程调用地址
    private static final String URL_PREFIX =
        "http://localhost:8002/inventory/minusInventory";

    // 负责远程调用的RestTemplate
    @Autowired
    private RestTemplate restTemplate;

    // 下单接口
    @RequestMapping("/placeAnOrder")
    public String placeAnOrder(){
   
   
        // 随便指定一个商品的ID
        String inventoryId = "92b1162a-eb7a-4d72-9645-dea3fe03c8e2";
        // 然后通过HttpClient调用库存服务的减库存接口
        String result = HttpClient.get(URL_PREFIX +
                "/minusInventory?inventoryId=" + inventoryId);
        System.out.println("\n调用减库存接口后的响应结果:" + result + "\n");

        // 调用减库存接口成功后,向订单库中插入一笔订单记录
        String orderId = UUID.randomUUID().toString();
        Order order = new Order(orderId,"黄金竹子","8888.88",inventoryId);
        Integer n = orderService.insertSelective(order);
        System.out.println("\n\n\n" + n + "\n\n\n");

        return "下单调用成功,需要处理事物.....";
    }
}

// 订单服务本地的OrderService实现类
@Service
public class OrderServiceImpl implements OrderService {
   
   

    // 新增订单会调用的插入方法
    @Override
    @Transactional
    public Integer insertSelective(Order record) {
   
   
        // 刻意制造出一个异常
        int i = 100 / 0;
        return orderMapper.insertSelective(record);;
    }
}

要注意看,在orderService.insertSelective(order)插入订单数据的方法中,我们通过100/0手动制造了一个异常,以此来模拟出「扣减库存」执行成功、「新增订单」执行失败的场景,接着看看库存DB、订单DB中对应的库存表、订单表数据,如下:

003.png

很明显,目前订单表中还没有任何数据,而库存表中仅有一条测试数据,但要注意:这两张表分别位于db_inventory、db_order两个不同的库中,此时「黄金竹子」的库存数量为100,现在分别启动库存服务、订单服务来做简单模拟:

  • 订单服务的下单接口:http://localhost:8001/order/placeAnOrder

这里就直接用浏览器做测试,浏览器调用下单接口后,控制台的日志如下:

004.png

两个服务对应的数据库中的数据如下:

005.png

结果十分明显,此时对应商品的库存扣掉了,但由于新增订单时出现异常,所以订单却并未增加,最终造成了数据不一致问题,这也就是前面所说到的分布式事务问题,这也是分布式系统中,需要解决的一个棘手问题。

1.2、该如何解决分布式事务问题呢?

早年间分布式架构并不像如今这么主流,一般只有一些互联网大厂才会使用,因此相关的技术生态和解决方案,并不像那么成熟,而分布式事务问题,也成为了使用分布式架构不得不解决的棘手问题,在分布式事务问题被发现后,期间推出了各种各样的解决方案,但如今保留下来的主流方案共有四种:

  • ①基于Best Efforts 1PC模式解决分布式事务问题。
  • ②基于XA协议的2PC、3PC模式做全局事务控制。
  • ③基于TTC方案做事务补偿。
  • ④基于MQ实现事务的最终一致性。

但上述四种仅是方法论,也就是一些虚无缥缈的理论,想要使用时还得根据其概念去自己落地,但如今分布式/微服务生态已经十分成熟,所以也有很多现成的落地技术,早已能够解决分布式事务问题,如Seata、GTS、LCN、 Atomikos、RocketMQ、Sharding-Sphere...等框架,都提供了完善的分布式事务支持,目前较为主流的是引入Seata框架解决,其内部提供了AT、TCC、XA、Seaga-XA四种模式,主推的是AT模式,使用起来也较为简单,大体步骤如下:

  • ①引入相关的Maven依赖。
  • ②修改相关的配置文件。
  • ③在需要保障分布式事务的方法上加一个@GlobalTransactional注解。

经过上述三步后,就能够轻松解决早期困扰大厂多年的分布式事务问题,是不是尤为轻松?其他的分布式事务框架使用步骤也相差无几,引入依赖、修改配置、加一个注解即可。

二、手写分布式事务框架的思路分析

前面对分布式事务问题的描述讲了一大堆,但真正要解决对应问题时,似乎仅靠一个注解就够了,这这这......,到底是怎么解决的呢?相信诸多使用Seata或其他分布式事务框架的小伙伴,心中难免都会产生这个疑惑。OK,咱们先假设现在没有这些成熟的分布式事务框架,如果自己要解决分布式事务问题,又该如何去实现呢?那么接下来就重点说说这块,真正让大家做到知其然并知其所以然!

不过做任何事情得先有规划,没有提前做好准备与计划,任何事一般都不会有太好的结果,因此先分析一下手写分布式事务框架的思路,把思路捋清楚之后,接着再去编写对应的代码实现。

前面讲的分布式事务问题,本质原因在于Spring是依靠数据库自身的事务机制来保障数据一致性的,而两个服务对应着两个库,两个库各自都维护着自己的事务机制,同时无法感知对方的事务状况,最终造成库存服务先提交了事务,而订单服务后回滚事务的情况出现。

所以想要自己解决分布式事务问题,首先就不能依靠MySQL自身的事务机制来解决问题,对于事务的管理必须要是全局性质的,也就是需要引入一个第三方来进行全局事务管理,而管理事务的这个角色,我们将其称之为事务管理者。

既然需要把事务交给第三者管理,那么每个参与全局事务的子服务,其事务的控制权必须要拿到,也就是不允许任何一个参与者私自提交或回滚事务,事务的控制权完全交给事务管理者,一组全局事务的结果到底是提交,还是回滚,这点全权由事务管理者决定。

到这里出现了两个角色:事务管理者、事务参与者,所谓的事务参与者,即代表参加一个全局事务的子服务,如前面的下单业务中,库存服务和订单服务,就可以理解成是两个事务参与者。

因为事务参与者要把自己的执行状态告知给管理者,同时管理者需要把事务的最终处理通知给每个参与者,所以管理者、参与者之间要能相互通信,所以等会儿会采用Netty网络框架,实现点对点对端通信。

但为了不影响业务,也就是减小代码的侵入性,对于事务参与者而言,改动的代码量越小越好,所以这里可以用Maven构建一个本地工程,在这个工程中提供一个全局事务注解,然后将该工程打成一个依赖包,其他需要使用分布式事务的子服务,直接在pom.xml中引入该依赖即可,涉及到分布式事务的方法,直接在业务方法上面加上注解。

OK,到这里分析出了大体步骤,大体的逻辑图如下:

006.png

有三个核心步骤:

  • ①实现事务管理者,能够协调全局事务的参与者,等整个业务调用链执行结束后决定事务处理方案。
  • ②实现事务参与者,能够向事务管理者中注册/加入全局事务、告知执行结果、接收最终处理方案。
  • ③构建Maven工程打成依赖包,实现自定义注解,尽量对业务代码做到低入侵。

但上述仅是大体思路,下面来开始逐步实现各个步骤,每个步骤中的细节会慢慢展开。

三、手写分布式事务框架实战

将实现的大概思路弄清楚后,接着先来实现一下事务管理者,实现通过Netty搭建一个服务端,如果对于Netty还不熟悉的小伙伴,后续我会更新Netty框架的文章,这里可以套用Java-NIO的概念去理解,代码如下:

// 事务管理者的启动类
public class Main {
   
   
    public static void main(String[] args){
   
   
        // 这个是自定义的一个服务端
        NettyServer nettyServer = new NettyServer();
        // 为其绑定IP和端口号
        nettyServer.start("localhost", 8080);
        System.out.println("\n>>>>>>事务管理者启动成功<<<<<\n");
    }
}

// Netty服务端 - 事务管理者
public class NettyServer {
   
   
    // 启动类
    private ServerBootstrap bootstrap = new ServerBootstrap();
    // NIO事件循环组
    private NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();

    // 启动方法
    public void start(String host, int port) {
   
   
        try {
   
   
            // 调用下面的初始化方法
            init();
            // 绑定端口和IP
            bootstrap.bind(host, port).sync();
        } catch (Exception e) {
   
   
            e.printStackTrace();
        }
    }

    // 初始化方法
    private void init(){
   
   
        bootstrap.group(nioEventLoopGroup)
                .channel(NioServerSocketChannel.class)
                // 添加一个自定义的处理器
                .childHandler(new ServerInitializer());
    }

    // 关闭方法
    public void close(){
   
   
        nioEventLoopGroup.shutdownGracefully();
        bootstrap.clone();
    }
}

到这里相对来说还比较简单,就是创建了一个服务端,然后绑定了一个IP和端口,最主要是上面NettyServer.init()方法,在里面添加了一个自定义的处理器,该处理器代码如下:

// NIO的通道处理器
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
   
   
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
   
   
        // 设置编码器、解码器、处理器
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("handler", new NettyServerHandler());
    }
}

前面添加的是解码器和编码器,主要用于数据传输时的编解码工作,重点是要关注最后添加的这个处理器,这个处理器是自定义的,里面会编写处理分布式事务的核心逻辑,基础架构如下:

// 分布式事务的核心处理器
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
   
   

    private static ChannelGroup channelGroup =
            new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    // 事务组中的事务状态列表
    private static Map<String, List<String>> transactionTypeMap = new ConcurrentHashMap<>();
    // 事务组是否已经接收到结束的标记
    private static Map<String, Boolean> isEndMap = new ConcurrentHashMap<>();
    // 事务组中应该有的事务个数
    private static Map<String, Integer> transactionCountMap = new ConcurrentHashMap<>();

    // 把整个Channel加入到channelGroup中
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
   
   
        Channel channel = ctx.channel();
        channelGroup.add(channel);
    }

    /***
     * 这里是待会儿实现分布式事务的核心
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
   
   
        // 等会儿逐步实现
    }

    // 向客户端(事务参与者)发送最终处理结果的方法
    private void sendResult(JSONObject result){
   
   
        System.out.println("事务最终处理结果:" + result.toJSONString());
        for (Channel channel : channelGroup){
   
   
            channel.writeAndFlush(result.toJSONString());
        }
    }
}

在这个处理器类中,主要是四个地方需要理解,即里面的三个容器,两个方法:

  • transactionTypeMap:记录一个事务组中,所有子事务的执行状态。
  • isEndMap:记录一个事务组,当前所有的子事务是否已抵达。
  • transactionCountMap:记录一个事务组,应该由多少个子事务组成。
  • handlerAdded():事务管理者的核心方法,所有来自客户端的消息都会被该方法监听到。
  • sendResult():这个方法会在一组事务全部执行完成后,用来给参与者返回处理结果使用。

OK,这里其实并非涉及到任何全局事务的处理,只是基于Netty搭建了一个服务端结构,后续会一点点去展开实现,这样也能够更便于诸位理解,接着再准备着手实现事务参与者。

3.1、事务参与者的实现过程

前面说过,所谓的事务参与者其实就是指每个业务子服务,所以按理来说,事务参与者的代码实现应该放到库存服务、订单服务中,但这样会对业务服务的造成较高的代码侵入性,同时每个需要使用分布式事务的子服务,都需要复制一遍业务参与者的代码,难免有些表现的“。不太智能”。

因此这里将事务参与者的核心代码,单独拧出来构建一个Maven子工程,实现完成后将其打成依赖包放到本地仓库,然后需要使用分布式事务功能的子服务,只需要引入Maven依赖即可(我这里将其命名为zhuzi-distributed-tx,即代表「竹子爱熊猫」开发的一款分布式事务框架,哈哈哈,臭美一下~)。

3.1.1、Netty客户端的实现

作为事务参与者,因为要和事务管理者之间进行通信,而事务管理者的本质是一个Netty-Server服务端,所以这里的事务参与者,本质就是Netty-Client客户端,所以Netty客户端的代码实现如下:

// Netty-Client客户端代码实现
@Component
public class NettyClient implements InitializingBean {
   
   

    // 这个是事务参与者的核心处理器
    private NettyClientHandler client = null;
    private static ExecutorService executorService = Executors.newCachedThreadPool();

    @Override
    public void afterPropertiesSet() throws Exception {
   
   
        System.out.println("\n\n>>>>>>事务参与者启动成功<<<<<<\n\n");
        start("localhost", 8080);
    }

    // 根据IP、端口地址,向服务端注册客户端
    public void start(String host, int port) {
   
   
        client = new NettyClientHandler();
        Bootstrap bootstrap = new Bootstrap();
        EventLoopGroup group = new NioEventLoopGroup();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                // 这里添加了一个自定义的处理器
                .handler(new ClientInitializer(client));
        try {
   
   
            bootstrap.connect(host, port).sync();
        } catch (InterruptedException e) {
   
   
            e.printStackTrace();
        }
    }

    public void send(JSONObject sendData) {
   
   
        try {
   
   
            // 调用处理器中向服务端发送数据的方法
            client.sendData(sendData);
        } catch (Exception e) {
   
   
            e.printStackTrace();
        }
    }
}

结构基本上和事务管理者类似,首先有这么一个启动类,会在业务服务启动时,顺势一起伴随启动,接着会根据地址找到事务管理者并注册,而事务参与者这里同样有一个自定义的处理器,代码如下:

// 自定义的处理器
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
   
   

    private NettyClientHandler nettyClientHandler;

    public ClientInitializer(NettyClientHandler nettyClientHandler) {
   
   
        this.nettyClientHandler = nettyClientHandler;
    }

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
   
   
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        // 这里初始化的是客户端的处理器
        pipeline.addLast("handler", nettyClientHandler);
    }
}

// 事务参与者的核心处理器
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
   
   

    private ChannelHandlerContext channelHandlerContext;

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
   
   
        channelHandlerContext = ctx;
    }

    // 所有服务端(事务管理者)返回的数据都会被该方法监听到
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) 
                                                            throws Exception {
   
   
        // 等会儿逐步实现
    }

    // 向服务端(事务管理者)发送数据的方法
    public void sendData(JSONObject result){
   
   
        System.out.println("向事物管理者发送数据:" + result.toJSONString());
        channelHandlerContext.writeAndFlush(result.toJSONString());
    }
}

在事务参与者的处理器中,同样存在两个方法:

  • channelRead():所有来自服务端(事务管理者)的消息都会被这个方法监听到。
  • sendData():客户端(参与者)向服务端(管理者)发送数据的方法。

OK,看到这里相信有些对于Netty不大熟悉的小伙伴会有些懵,那下面画一幅图来简单说明一下目前的结构,就算你不会Netty框架也没关系,只要记住这幅图即可:

007.png

这里实则是基于Netty搭建了一个客户端与服务端的通信架构,这样既能够让参与者向管理者注册事务,也能够让管理者向参与者返回结果,理解这个核心思想就OK

3.1.2、事务参与者的核心实现

搭建出管理者、参与者之间的基础通信架构后,接着来实现事务参与者的核心代码,先定义个枚举类:

// 事务类型的枚举类
public enum TransactionalType {
   
   
    // 提交和回滚状态
    commit, rollback;
}

这个枚举类会用来在后续判断事务状态,它会作为子事务对象的一个子属性,事务对象的定义如下:

// 分布式事务 - 子事务对象
public class ZhuziTx {
   
   
    // 当前子事务属于哪个事务组
    private String groupId;
    // 当前子事务的事务ID
    private String transactionalId;
    // 当前子事务的事务类型
    private TransactionalType transactionalType;
    // 当前子事务的任务等待队列(基于此实现事务控制权)
    private Task task;

    public ZhuziTx(String groupId, String transactionalId, TransactionalType transactionalType) {
   
   
        this.groupId = groupId;
        this.transactionalId = transactionalId;
        this.transactionalType = transactionalType;
        this.task = new Task();
    }

    // 省略其他构造方法、以及get/set方法......
}

每个参与全局事务的子事务,在这里都会被封装为一个个ZhuziTx对象,每个子事务具备下述几个属性:

  • groupId:表示当前子事务属于那一组全局事务,因为同时可能存在多组分布式事务。
  • transactionalId:当前子事务的事务ID值,具备全局唯一特性。
  • transactionalType:这个是前面的枚举类,表示当前子事务最终的执行状态(回滚/提交)。
  • task:这个目前暂时不会用到,后面用来实现事务提交或回滚的控制权。

了解这个基本的子事务对象后,接着再来看看事务参与者的核心实现类:

// 事务参与者的核心实现类
public class ZhuziTxParticipant {
   
   
    // 获取前面伴随服务启动产生的NettyClient客户端
    private static NettyClient nettyClient = 
            ApplicationContextProvider.getBean(com.zhuzi.distributedtx.netty.NettyClient.class);

    // 存储当前线程在执行的子事务对象
    private static ThreadLocal<ZhuziTx> current = new ThreadLocal<>();
    // 存储当前子事务所属的事务组ID值
    private static ThreadLocal<String> currentGroupId = new ThreadLocal<>();
    // 存储当前子事务所属的事务组子事务总量
    private static ThreadLocal<Integer> transactionalCount = new ThreadLocal<>();
    // 事务ID和子事务对象的映射组
    private static Map<String,ZhuziTx> ZHUZI_TRANSACTIONAL_MAP = new HashMap();


    /**
     *  向事务管理者中发送一个创建事务组的命令
     * @return
     */
    public static String createZhuziTransactionalManagerGroup(){
   
   
        // 随机产生一个UUID作为事务组ID
        String groupID = UUID.randomUUID().toString();
        // 通过JSON做序列化
        JSONObject sendData = new JSONObject();
        // 传入前面产生的事务组ID,以及本次操作为create创建指令
        sendData.put("groupId", groupID);
        sendData.put("command", "create");
        // 调用客户端的send()方法向服务端发送数据
        nettyClient.send(sendData);
        System.out.println(">>>>>向管理者发送创建事务组命令成功<<<<<");
        // 把事务组ID存在currentGroupId当中
        currentGroupId.set(groupID);
        // 对外返回事务组ID值
        return groupID;
    }

    /***
     *  创建一个子事务对象
     */
    public static ZhuziTx createTransactional(String groupId){
   
   
        // 随机产生一个UUID作为子事务ID
        String transactionalId = UUID.randomUUID().toString();
        // 示例化出一个子事务对象
        ZhuziTx zhuziTransactional = new ZhuziTx(groupId, transactionalId);
        // 将创建出的子事务对象保存到相关的变量中
        ZHUZI_TRANSACTIONAL_MAP.put(groupId, zhuziTransactional);
        current.set(zhuziTransactional);
        // 对事务组数量+1
        Integer integer = addTransactionCount();
        System.out.println("创建子事务,目前事务组长度为:" + integer);
        return zhuziTransactional;
    }

    /**
     * 注册事务(向事务管理者的事务组中添加子事务)
     */
    public static ZhuziTx addZhuziTransactional(ZhuziTx ztp,
                                                Boolean isEnd, TransactionalType type){
   
   
        // 通过JSON序列化一个对象
        JSONObject sendData = new JSONObject();
        // 传入当前子事务的组ID、事务ID、事务类型、操作类型....信息
        sendData.put("groupId", ztp.getGroupId());
        sendData.put("transactionalId", ztp.getTransactionalId());
        sendData.put("transactionalType", type);
        sendData.put("command", "add");
        sendData.put("isEnd", isEnd);
        sendData.put("transactionalCount", ZhuziTxParticipant.getTransactionalCount());
        // 将封装好的JSON发送给事务管理者
        nettyClient.send(sendData);
        System.out.println(">>>>>向管理者发送添加子事务命令成功<<<<<");
        return ztp;
    }

    // 增加事务组数量的方法
    public static Integer addTransactionCount() {
   
   
        System.out.println(transactionalCount.get());
        int i = (transactionalCount.get() == null
                ? 0 : transactionalCount.get()) + 1;
        transactionalCount.set(i);
        return i;
    }

    // 省略前面类成员的Get/Set方法.....
}

上述这个类即是事务参与者的核心处理类,里面主要提供了三个核心方法:

  • createZhuziTransactionalManagerGroup():向管理者中申请创建一个事务组。
  • createTransactional():根据当前服务的事务情况,创建一个子事务对象。
  • addZhuziTransactional():向管理者的指定事务组中添加一个子事务。

同时为了提供给多个服务使用,这个类中的成员基本上都采用ThreadLocal来修饰,也就是每条执行不同业务的线程,都会拥有自己的current、currentGroupId、transactionalCount这三个属性,这三个属性会用来辅助完成在管理者中创建事务组、添加子事务的工作。

3.1.3、接管参与者的事务控制权

经过上述的一些流程后,虽然构建出了一些基础组件,但这并不能阻止每个服务各自提交事务,默认情况下,MySQL执行完一条SQL语句后会立马提交事务,如若想让MySQL不自动提交事务,则必须通过begin之类的方式来手动管理事务。

之前这个工作交给Spring来管理,Spring在业务操作未抛出异常的情况下,则会向MySQL发送commit指令,这里也不去完全重写Spring的事务机制,因为这样会导致工作量尤为巨大,而是基于Spring原有的事务机制基础上,剥夺掉Spring主动提交、回滚事务的权限。

那究竟该如何剥夺掉Spring主动提交、回滚事务的权限呢?这里需要先简单理解一下Spring事务机制的原理。

之前提到过,Spring框架的事务机制,依旧是依赖于数据库自身所提供的事务机制来实现,这也就意味着当被Spring@Transactional注解修饰的方法,执行结束后,Spring会去调用JDBC接口中的commit()/rollback()方法,从而实现事务的提交或回滚。

简单理解上述原理后,那再来思考一下咱们该如何接管事务的控制权呢?局势就十分明朗了,既然Spring是通过JDBC接口中的方法,来完成事务提交或回滚的,那咱们只需要撰写一个AOP切面,去拦截尝试调用对应接口提交/回滚事务的线程即可。

但是问题又又又来了,咱们将调用JDBC事务接口的线程拦截后,这还不够,因为这并不能让咱们接管事务的控制权,这仅仅只能剥夺掉Spring主动提交、回滚事务的权限。但最终每个业务服务(事务参与者)在收到事务管理者最终的处理方案后,依旧需要提交或回滚事务,所以我们的zhuzi-distributed-tx还需要彻底拿到事务控制权。

但这里该如何去拿到事务控制权呢?其实很简单,这里先写一下AOP切面的实现,如下:

// 剥夺并接管Spring事务控制权的切面
@Aspect
@Component
public class ZhuziDataSourceAspect {
   
   
    @Around("execution(* javax.sql.DataSource.getConnection(..))")
    public Connection dataSourceAround(ProceedingJoinPoint proceedingJoinPoint)
                                                throws Throwable {
   
   
        System.out.println("事务切面成功拦截,正在接管控制权......");

        // 如果当前调用事务接口的线程正在参与分布式事务,
        // 则返回自定义的Connection对象接管事务控制权
        if (ZhuziTxParticipant.getCurrent() != null){
   
   
            System.out.println("返回自定义的Connection对象.......");
            Connection connection = (Connection) proceedingJoinPoint.proceed();
            return new ZhuziConnection(connection, ZhuziTxParticipant.getCurrent());
        }

        // 如果当前线程没有参与分布式事务,让其正常提交/回滚事务
        System.out.println("返回JDBC的Connection对象.............");
        return (Connection) proceedingJoinPoint.proceed();
    }
}

这个切面的代码不多,主要是拦截了所有调用DataSource.getConnection()的线程,然后会进行判断,如果当前线程执行的业务方法,正在参与分布式事务,则返回自定义的数据库连接对象,如果是未参与分布式事务的本地事务操作,则让其正常提交/回滚事务。

那这里为何要去拦截执行DataSource.getConnection()这个方法的线程呢?因为Spring在提交事务时,会先调用该方法获取数据库连接对象,然后通过数据库连接对象中的rollback/commit方法完成事务回滚或提交,因此可以以该方法作为切入点,从而接管Spring的事务控制权。

但具体如何接管的呢?核心实现则位于ZhuziConnection这个自定义的数据库连接类中,如下:

// 自定义的数据库连接类(必须要实现JDBC的Connection接口)
public class ZhuziConnection implements Connection {
   
   
    // 原本应该返回的数据库连接对象
    private Connection connection;
    // 存放参与分布式事务的子事务
    private ZhuziTx zhuziTx;

    // 负责提交事务的线程
    private ExecutorService commitT = Executors.newSingleThreadExecutor();
    // 负责回滚事务的线程
    private ExecutorService rollbackT = Executors.newSingleThreadExecutor();


    public ZhuziConnection(Connection connection, ZhuziTx zhuziTx) {
   
   
        this.connection = connection;
        this.zhuziTx = zhuziTx;
    }

    @Override
    public void commit() throws SQLException {
   
   
        // 交给线程池中的线程来做最终的事务提交
        commitT.execute(() -> {
   
   
            try {
   
   
                // 阻塞线程,禁止提交
                zhuziTx.getTask().waitTask();

                // 如果管理者返回事务可以提交,则提交事务
                if (zhuziTx.getTransactionalType().equals(TransactionalType.commit)) {
   
   
                    System.out.println("\n收到管理者最终决断:提交事务中\n");
                    connection.commit();
                    System.out.println("\n子事务提交事务成功...\n");
                }
                // 否则调用rollback()方法回滚事务
                else {
   
   
                    System.out.println("\n收到管理者最终决断:回滚事务中...\n");
                    connection.rollback();
                    System.out.println("\n子事务回滚事务成功...\n");
                }
            } catch (Exception e) {
   
   
                e.printStackTrace();
            } finally {
   
   
                try {
   
   
                    connection.close();
                } catch (SQLException e) {
   
   
                    e.printStackTrace();
                }
            }
        });
    }

    @Override
    public void rollback() throws SQLException {
   
   
        // 交给线程池中的线程来做最终的事务回滚
        rollbackT.execute(() -> {
   
   
            zhuziTx.getTask().waitTask();
            try {
   
   
                connection.rollback();
                System.out.println("\n\n子事务回滚事务成功...\n\n");
            } catch (SQLException e) {
   
   
                e.printStackTrace();
            } finally {
   
   
                try {
   
   
                    connection.close();
                } catch (SQLException e) {
   
   
                    e.printStackTrace();
                }
            }
        });
    }

    @Override
    public void close() throws SQLException {
   
   
        connection.close();
    }

    // 省略其他Connection接口需要实现的方法......
}

这个自定义的数据库连接类,必须要实现JDBC中的Connection接口,毕竟当线程在尝试调用DataSource.getConnection()获取连接时,通过切面返回一个并非Connection类型的对象回去,这自然会导致程序报错,同时咱们依旧需要通过原本的连接对象,实现事务最终的提交或回滚,因此也要将原本应该返回的Connection对象注入到自定义的连接类中。

对于接口Connection中的其他方法,直接调用原本连接对象的方法来执行即可,咱们想要接管事务的控制权,唯一需要重写的就是rollback()/commit()这两个方法,接着来看看重写后的两个方法。

008.png

这里先将代码截出来便于理解,但想要理解这两段代码,需要具备一定程度的多线程编程基础,否则会十分困惑,首先要记住:无论执行哪个方法,都会有两条线程而并非一条!当一条线程执行完业务方法后,接着会根据Spring的事务机制,来获取连接对象调用commit/rollback方法结束事务,而这里咱们将commit/rollback方法改成了向线程池中提交一个任务,那也就意味着:业务线程向对应的线程池提交方法后,会立即返回,这里业务线程并不会被阻塞

而这里不阻塞,阻塞的是什么呢?其实被阻塞住的是线程池中的线程,这些线程会阻塞至事务管理者返回最终决断时,才会继续往下执行(后续代码中会唤醒这些线程),这样就做到了即不阻塞业务线程,又没有真正提交/回滚事务,从而真正的拿到了事务控制权,到底啥时候提交/回滚事务,这完全可以由咱们自己决定。

但注意:阻塞线程的方法是调用了子事务对象中的Task.waitTask()方法,还记得咱们之前定义的子事务对象嘛?其中有一个这样的成员:

009.png

这个成员的实现类如下:

// 子事务的等待队列:基于此实现事务控制权
public class Task {
   
   
    // 通过ReentrantLock的Condition条件等待队列实现线程阻塞/唤醒
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    // 阻塞挂起线程的方法
    public void waitTask(){
   
   
        System.out.println("事务控制权已经被拦截挂起........");
        lock.lock();
        try {
   
   
            condition.await();
        } catch (InterruptedException e) {
   
   
            e.printStackTrace();
        } finally {
   
   
            lock.unlock();
        }
    }

    // 唤醒放下线程的方法
    public void signalTask(){
   
   
        System.out.println("事务控制权已经被拦截放下........");
        lock.lock();
        condition.signal();
        lock.unlock();
    }
}

这个Task类的实现尤为简单,内部通过ReentrantLockCondition多条件等待队列,实现waitTask()阻塞线程、signalTask唤醒线程这两个方法,当Spring尝试提交/回滚参与者的事务时,由于使用的是咱们自定义的连接对象,因此调用commit/rollback方法后并不会真正结束事务,而是会把提交/回滚事务的工作交给线程池完成。

当线程池收到业务线程提交的任务后,会首先挂起自身线程,等待后续出现唤醒指令时,才会真正的执行commit/rollback操作。

那这里的线程被阻塞后,到底什么时候会唤醒呢?也就是多久才会真正的提交/回滚事务呢?这里需要回到之前的NettyClientHandler.channelRead()方法,如下:

// 当事务管理者返回最终决断时,该方法会被触发,进而会执行这个方法的代码
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) 
        throws Exception {
   
   
    System.out.println("接收到事务管理者的最终决断:" + msg.toString());

    // 反序列化解析JSON数据
    JSONObject data = JSON.parseObject((String) msg);
    String groupId = data.getString("groupId");
    String command = data.getString("command");
    System.out.println("接收command:" + command);

    // 对事务进行操作
    ZhuziTx zhuziTx = ZhuziTxParticipant.getZhuziTransactional(groupId);

    // 如果事务管理者最终决定提交事务
    if ("commit".equals(command)){
   
   
        // 根据groupID找到子事务并设置commit状态
        zhuziTx.setTransactionalType(TransactionalType.commit);
    }
    // 如果事务管理者最终决定回滚事务
    else{
   
   
        // 根据groupID找到子事务并设置rollback回滚状态
        zhuziTx.setTransactionalType(TransactionalType.rollback);
    }

    // 唤醒在之前阻塞的、负责提交/回滚事务的线程
    zhuziTx.getTask().signalTask();
}

当参与者收到管理者的最终通知后,根据事务管理者的最终决断来设置事务状态,然后再唤醒前面阻塞的线程,真正执行提交或回滚事务的操作:

  • 如果管理者的通知为commit,这里会将子事务的状态设为TransactionalType.commit
  • 否则这里会将子事务的状态设为TransactionalType.rollback

channelRead()方法被触发后,最终会调用signalTask()唤醒前面阻塞的线程,前面阻塞的线程被唤醒后,会接着执行if (zhuziTx.getTransactionalType().equals(TransactionalType.commit))这行代码,也就是判断管理者最终给出的决断是否为commit,如果是则提交当前子事务,否则会调用connection.rollback()方法回滚当前子事务。

看到这里,对于事务最终是如何提交或回滚的,相信大家已经明白了其中原理,但一通代码看下来大家估计有些绕,所以接着上一个流程图,帮大家总结一下这个过程,如下:
010.png

3.1.4、自定义分布式事务注解

为了保障对业务代码的零侵入性,这里使用自定义注解来实现参与者嵌入业务服务的功能,当其他子服务需要使用时,只需要在对应的方法上加上一个注解即可,自定义注解如下:

// 自定义的分布式事务注解
@Target({
   
   ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface ZhuziTransactional {
   
   
    // 标识当前是全局事务的开启者
    boolean isStart() default false;
    // 标识当前是全局事务的结束者
    boolean isEnd() default false;
}

这个注解中有两个值可选,isStart=true表示当前被该注解修饰的方法,是一个分布式事务中的第一个业务操作,isEnd=true则代表是最后一个业务操作,但光定义注解是没有意义,接着还需要通过AOP切面来拦截使用该注解的方法,切面如下:

// 负责拦截自定义注解的切面
@Aspect
@Component
public class ZhuziTransactionalAspect implements Ordered {
   
   

    @Around("@annotation(com.zhuzi.distributedtx.annotation.ZhuziTransactional)")
    public Integer invoke(ProceedingJoinPoint proceedingJoinPoint){
   
   
        System.out.println("分布式事务注解生效,切面成功拦截............");

        // 获取对应注解的业务方法,以及方法上的注解对象
        MethodSignature signature = (MethodSignature) proceedingJoinPoint.getSignature();
        Method method = signature.getMethod();
        ZhuziTransactional zta = method.getAnnotation(ZhuziTransactional.class);

        // 创建事务组
        String groupId = "";
        // 如果目前触发切面的方法,是一组全局事务的第一个子事务
        if (zta.isStart()){
   
   
            // 则向事务管理者注册一个事务组
            groupId = ZhuziTxParticipant.createZhuziTransactionalManagerGroup();
        }
        // 否则获取当前事务所属的事务组ID
        else {
   
   
            groupId = ZhuziTxParticipant.getCurrentGroupId();
        }

        // 创建子事务
        ZhuziTx zhuziTx = ZhuziTxParticipant.createTransactional(groupId);

        // spring会开启MySQL事务
        try {
   
   
            //执行spring切面(dataSource切面),执行具体的业务方法
            Object result = proceedingJoinPoint.proceed();

            // 没有抛出异常证明该事务可以提交,把子事务添加进事务组
            ZhuziTxParticipant.addZhuziTransactional(zhuziTx, zta.isEnd(),
                    TransactionalType.commit);

            // 返回执行成功的结果
            return (Integer) result;
        }  catch (Exception e){
   
   
                e.printStackTrace();
                // 抛出异常证明该事务需要回滚,把子事务添加进事务组
                ZhuziTxParticipant.addZhuziTransactional(zhuziTx, zta.isEnd(),
                        TransactionalType.rollback);
        } catch (Throwable throwable) {
   
   
            throwable.printStackTrace();
            // 把子事务添加进事务组,抛出异常证明该事务需要回滚
            ZhuziTxParticipant.addZhuziTransactional(zhuziTx, zta.isEnd(),
                    TransactionalType.rollback);
            // 返回执行失败的结果
            return -1;
        }
        return -1;
    }

    // 设置优先级,让前面拦截事务的切面先执行
    @Override
    public int getOrder() {
   
   
        return 10000;
    }
}

这个切面代码并不多,但逻辑相对来说也并不简单,整个方法执行的核心逻辑如下:

  • ①通过反射机制获取自定义注解修饰的Method方法对象,以及注解对象自身。
  • ②判断业务方法上注解的值,看看是isStart是否为True
    • true,表示触发切面的业务方法,是分布式事务中的第一个业务操作,所以会先向管理者申请创建一个事务组,并获取事务组ID
    • 不为true,判断当前子事务应该属于哪个事务组,获取事务组ID
  • ③通过前面拿到的事务组ID,调用createTransactional()方法实例化一个子事务对象。
  • ④通过AOP中的proceedingJoinPoint.proceed()方法,执行切面拦截的具体业务操作。
  • ⑤如果业务操作执行过程中没有抛出异常,则向管理者的事务组中添加一个commit状态的子事务。
  • ⑥如果业务操作执行过程中抛出异常,则向管理者的事务组中添加一个rollback状态的子事务。

上述便是整个AOP切面的核心工作,一句话总结就是:会根据当前子事务的执行状态,向事务管理者的事务组中添加一个子事务。但为了防止这个切面的优先级高过前面的切面,因此也需要重写一下getOrder()方法,将当前切面的优先级放的低一些,让拦截Spring事务的切面先执行。

3.1.5、事务组ID是如何在上下游服务中传递的?

不过在这个切面中,有一个细节,即当前子事务是怎么知道自己是属于哪个事务组的呢?在代码中使用了ZhuziTxParticipant.getCurrentGroupId()获取了当前子事务的组ID,这个GroupId是如何传递过来的呢?目前的调用情况如下:

  • ①客户端调用订单服务的下单接口时,订单服务会先调用库存服务的减库存接口。
  • ②库存服务中开启一个分布式事务,生成一个全局唯一ID,并创建一个事务组。
  • ③库存服务根据前面生成得到的组ID,把自身事务的执行状态,加入到管理者的事务组中。
  • ④库存服务向订单服务返回调用结果,即OK/200,以及调用减库存接口成功的信息。
  • ⑤订单服务收到调用结果后,继续调用本地的新增订单方法,执行完成后添加执行状态到管理者。

这个过程听着似乎不是特别难对吧?但问题就在于④、⑤之间,订单服务中的「新增订单」执行完成后,又如何知道自己是属于哪个分布式事务组的呢?所以这里需要把库存服务中,生成的groupId传递过来,这样才能确保两个子事务,会添加到同一个事务组里面。

但这个需求听起来简单,但实现起来却并不容易,无论是通过拦截器、亦或是过滤器,都无法实现这个需求,因为在库存服务执行完成后,响应报文就已经生成,所以在拦截器、过滤器中新增响应头信息,这是无法生效的。

那最终我是如何处理的呢?简单翻阅源码后,这里用到了Spring框架预留的一个钩子接口:ResponseBodyAdvice,实现这个接口的钩子类,会在Controller方法执行完成之后,响应报文组装之前被调用,因此咱们可以在这里织入事务组ID,实现如下:

//  Spring框架预留的钩子接口:织入事务组ID
@ControllerAdvice
public class GroupIdRespAdvice implements ResponseBodyAdvice {
   
   
    // 钩子类的前置方法:必须为true才会执行beforeBodyWrite()方法
    @Override
    public boolean supports(MethodParameter methodParameter, Class aClass) {
   
   
        return true;
    }

    // Controller方法执行完成之后,响应报文组装之前执行
    @Override
    public Object beforeBodyWrite(Object body, MethodParameter methodParameter,
                                  MediaType mediaType, Class aClass,
                                  ServerHttpRequest request,
                                  ServerHttpResponse response) {
   
   
        // 如果ThreadLocal中的事务组ID不为空,代表当前请求参与了分布式事务,
        // 会获取对应的事务组ID放入到响应头中(对于普通请求不会改写响应头)
        if (ZhuziTxParticipant.getCurrentGroupId() != null){
   
   
            // 把需要传递的事务组ID、子事务数量放入响应头中
            response.getHeaders().set("groupId",
                ZhuziTxParticipant.getCurrentGroupId());
            response.getHeaders().set("transactionalCount",
                String.valueOf(ZhuziTxParticipant.getTransactionCount()));
        }
        return body;
    }
}

这样处理之后,就可以在上游服务的请求出口,为每个涉及分布式事务的请求添加上一个响应头信息,在响应头中会传输下游服务所需的事务组ID、组中子事务数量信息,接着还需要在下游服务的响应入口,获取这些请求头信息,实现如下:

// HttpClient远程调用工具
public class HttpClient {
   
   
    // GET请求的方法
    public static String get(String url) {
   
   
        String result = "";
        try {
   
   
            // 创建一个httpClient对象,并调用传入的URL接口
            CloseableHttpClient httpClient = HttpClients.createDefault();
            HttpGet httpGet = new HttpGet(url);
            CloseableHttpResponse response = httpClient.execute(httpGet);

            // 如果调用结果是返回OK,状态码为200
            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
   
   
                // 获取response对象中的所有响应头
                Header[] allHeaders = response.getAllHeaders();
                for (Header header : allHeaders) {
   
   
                    // 从中找到上游服务传递的组ID、事务数量,并赋值给自己的子事务
                    if ("groupId".equals(header.getName())){
   
   
                        String groupId = header.getValue();
                        ZhuziTxParticipant.setCurrentGroupId(groupId);
                    }
                    if ("transactionalCount".equals(header.getName())){
   
   
                        String transactionalCount = header.getValue();
                        ZhuziTxParticipant.setTransactionCount(
                                Integer.valueOf(transactionalCount == null ?
                                    "0" : transactionalCount));
                    }
                }
                // 向调用方返回上游服务最终的调用结果
                result = EntityUtils.toString(response.getEntity(), "utf-8");
            }
            response.close();
        } catch (Exception e) {
   
   
            e.printStackTrace();
        }
        return result;
    }
}

因为我这里是通过HttpClient来实现远程调用的,所以我只需要在调用结束后,读取response对象的请求头信息,然后获取其中的事务组ID,并保存到自己的ThreadLocal中即可。

但如果是用了Dubbo、gRPC、Fegin、RestTemplate....等远程调用的方式,大家可自行根据RPC工具的类型,去编写Filter过滤器截断响应结果,然后获取响应头中的数据,接着放入自己的ThreadLocal中即可。

这个地方的本质实现就是分布式系统中,按调用链路去依次传递一个全局共享数据,在上游服务的出口写入响应头信息、下游服务的入口获取响应头信息即可。

我这里的groupId、transactionalCount都是放入到ZhuziTxParticipantThreadLocal<String> currentGroupId成员中,因为这里使用了ThreadLocal来存储,所以多个分布式事务一起执行的情况,依旧不会冲突。

3.1.6、事务参与者的收尾工作

到这里,事务参与者就完整的实现出来了,但为了提供给其他业务子服务使用,因此咱们还需要执行一下mvn -package命令,将当前实现好的zhuzi-distributed-tx框架,打包到Maven本地仓库中,后续其他子服务可通过下述GAV坐标导入依赖:

<dependency>
    <groupId>com.zhuzi</groupId>
    <artifactId>zhuzi-distributed-tx</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

3.2、事务管理者的核心实现

在前面的过程中,咱们只基于Netty构建出了一个最基本的服务端,但对于事务管理者的核心逻辑还并未开始实现,因此现在开始撰写管理者的核心实现,也就是回到NettyServerHandler.channelRead()方法中,实现核心的逻辑,代码如下:

/***
 *
 *  创建事务组,并且添加保存事务
 *  并且需要判断,如果所有事务都已经执行了(有结果了,要么提交,要么回滚)
 *      如果其中有一个事务需要回滚,那么通知所有客户进行回滚,否则则通知所有客户端进行提交
 */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
   
   
    System.out.println("接受数据:" + msg.toString());

    JSONObject jsonObject = JSON.parseObject((String)msg);

    // create:创建一个事务组,add:添加事务
    String command = jsonObject.getString("command");
    // 事务组ID
    String groupId = jsonObject.getString("groupId");
    // 子事务类型(commit:待提交、rollback:待回滚)
    String transactionType = jsonObject.getString("transactionalType");
    // 事务数量(当前这个全局事务的总参与者数量)
    Integer transactionCount = jsonObject.getInteger("transactionalCount");
    // 是否结束事务(是否为最后一个事务)
    Boolean isEnd = jsonObject.getBoolean("isEnd");

    // 如果参与者发来的是create指令,则创建一个事务组
    if ("create".equals(command)){
   
   
        transactionTypeMap.put(groupId, new ArrayList<String>());
    }
    // 如果参与者是add操作,则将对应子事务加入事务组
    else if ("add".equals(command)){
   
   
        transactionTypeMap.get(groupId).add(transactionType);

        // 判断当前子事务是否为整组最后一个事务
        if (isEnd) {
   
   
            // 是则声明本组事务已结束
            isEndMap.put(groupId, true);
            transactionCountMap.put(groupId, transactionCount);
        } else {
   
   
            // 否则声明后续依旧会有事务到来
            isEndMap.put(groupId, false);
            transactionCountMap.put(groupId, transactionCount);
        }

        // 调试时的输出信息
        System.out.println("isEndMap长度:" + isEndMap.size());
        System.out.println("transactionCountMap长度:" + transactionCountMap.get(groupId));
        System.out.println("transactionTypeMap长度:" + transactionTypeMap.get(groupId).size());

        JSONObject result = new JSONObject();
        result.put("groupId",groupId);
        // 如果已经接收到结束事务的标记,则判断事务是否已经全部到达
        if (isEndMap.get(groupId) &&
                transactionCountMap.get(groupId)
                        .equals(transactionTypeMap.get(groupId).size())){
   
   

            // 如果已经全部到达则看是否需要回滚
            if (transactionTypeMap.get(groupId).contains("rollback")){
   
   
                System.out.println("事务最终回滚..........");
                result.put("command","rollback");
                sendResult(result);
            // 如果一组事务中没有任何事务需要回滚,则提交整组事务
            } else {
   
   
                System.out.println("事务最终提交..........");
                result.put("command","commit");
                sendResult(result);
            }
        }
    }
}

之前聊到过,客户端(参与者)所有发送给服务端(管理者)的数据,都会被这个channelRead()方法监听到,也就是每当有客户端给服务端发送数据时,都会触发这个方法执行,因此咱们只需在这个方法中实现核心逻辑即可,代码逻辑如下:

  • ①通过JSON反序列化,解析客户端(参与者)发送过来的数据包。
  • ②如果参与者数据包的command=create,则先创建一个事务组。
  • ③如果参与者数据包的command=add,则将对应子事务的执行状态添加进事务组。
  • ④将子事务添加进事务组后,接着判断一下isEnd是否为true
    • 否:继续等待其他子事务的到来。
    • 是:进入第⑤步,对一个分布式事务进行最终处理。
  • ⑤判断整组事务中是否包含rollback,只要有一个子事务的状态为rollback,整组事务都需要回滚,反之则提交。
  • ⑥最后构建一个JSON数据包,并调用sendResult()方法,把管理者的最终决断通知给每个参与者。

上面是整个事务管理者的核心逻辑,简单来说其实就两个功能:

  • 根据参与者数据包中的command指令,来创建事务组或添加子事务。
  • 在一组事务全部已到达后,判断整组事务最终到底要回滚还是提交。

相较于事务参与者的实现来说,事务管理者的代码还比较简单,接着来做个简单的测试。

3.3、测试自定义的分布式事务框架

前面实现了事务参与者和事务管理者的核心功能后,接着在对应的业务服务中引入zhuzi-distributed-tx框架的依赖:

<dependency>
    <groupId>com.zhuzi</groupId>
    <artifactId>zhuzi-distributed-tx</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

然后先在库存服务service层的扣减库存方法上,加上一个@ZhuziTransactional注解:

@Override
@Transactional
// 这里要写上isStart = true,因为这是第一个业务操作
@ZhuziTransactional(isStart = true)
public Integer updateByPrimaryKeySelective(Inventory record) {
   
   
    int i = inventoryMapper.updateByPrimaryKeySelective(record);
    return i;
}

接着在订单服务service层的新增订单方法上,要同样加一个@ZhuziTransactional注解:

@Override
@Transactional
// 这里要写上isEnd = true,因为这是最后一个业务操作
@ZhuziTransactional(isEnd = true)
public Integer insertSelective(Order record) {
   
   
    // 刻意抛出一个异常
    int i = 100 / 0;
    int n = orderMapper.insertSelective(record);
    System.out.println("\n\n\n" + n + "\n\n\n");
    return n;
}

接着分别启动事务管理者、注册中心、库存服务、订单服务四个进程,然后开始测试,依旧通过浏览器调用之前的下单接口:

  • http://localhost:8001/order/placeAnOrder

此时重点观察控制台的日志输出,来看看结果,事务管理者的控制台输出如下:

011.png

事务参与者-库存服务的控制台输出如下:

012.png

事务参与者-订单服务的控制台输出如下:

013.png

上述三个日志输出中,重点观察事务管理者的输出,整个流程如下:

014.png

当事务管理者完成图中第七步后,接着事务参与者(业务服务)这边会收到来自管理者的通知,各自把自己子事务(「扣减库存、新增订单」)回滚,最后来看看数据库的表数据:

015.png

最终会发现,在两个数据库中,数据依旧没有发生变化,库存表中的数据依旧是99,而订单表中也没有新增订单数据,最终做到了数据的完全一致性,从而解决了分布式事务造成的数据不一致问题。

四、分布式事务手写/原理篇总结

经过前面三个阶段的阐述后,咱们一点点的从分布式事务问题引出、演示,再到逐步去推敲手写分布式事务框架的思路,再慢慢的手写出了所有代码,最终成功解决了分布式事务问题,这个过程相对来说也并不轻松,尤其是一些底子较弱的小伙伴,阅读起来可能存在很大压力,所以在最后再完整总结一下:

  • ①客户端调用订单服务的下单接口时,订单服务会先调用库存服务的减库存接口。
  • ②库存服务中开启一个分布式事务,生成一个全局唯一ID,并创建一个事务组。
  • ③库存服务根据前面生成得到的组ID,把自身事务的执行状态(commit),加入到管理者的事务组中。
  • ④库存服务向订单服务返回调用成功,并且通过Spring钩子类,将groupId放到响应头中。
  • ⑤订单服务收到调用结果后,从响应头中拿到事务组ID、子事务数量,放到自身的ThreadLocal中。
  • ⑥订单服务继续调用本地的新增订单方法,但由于咱们手动制造了异常,所以执行会报错,最终会根据前面的groupId,在管理者的事务组中添加一条执行状态为rollback的子事务。
  • ⑦事务管理者发现「下单」这个分布式事务的所有子事务全部抵达后,接着会进行最终审判,发现其中存在一个rollback,然后通知对应的所有事务参与者回滚。
  • ⑧库存、订单服务收到事务管理者的最终审判后,最终回滚各自的所有业务操作,确保数据的完全一致性。

咱们写的整个分布式事务框架,其核心处理流程如上,而LCN、Seata-AT模式的执行流程也大致如此,都是基于数据库的事务机制来实现的,但实际上会比咱们这个更加复杂很多倍,会牵扯到资源管理者、全局锁等概念。

同时真正的分布式事务框架中,都只会有一个分布式事务注解,生成全局事务ID的操作,会放到最开始完成,然后向下进行传递,默认是最后一个子事务来结束整组事务操作,伪逻辑如下:

@GlobalTransactional // ①会在这里先生成全局事务ID
public String placeAnOrder(String shopID){
   
   
    RPC.减库存接口();  // ②传递全局ID,减库存执行完成后,会根据全局ID添加一个子事务
    Local.新增订单方法(); // ③传递全局ID,新增订单执行后,再向事务组添加一个子事务
    // ④因为后面没有其他操作了,默认会结束这组分布式事务,进行最终的提交/回滚操作
}

但咱们设计的这款分布式事务框架,则设计出了两个分布式事务注解,用isStrart来开启分布式事务 isEnd来结束事务,这里主要是能让大家更便于理解分布式事务的核心原理,不过这种做法并不完善。

但我们的目的并不是打造一款商用框架,而是摒弃繁枝末节,真正理解分布式事务框架的核心原理,因此我就不继续去完善zhuzi-distributed-tx这个“分布式事务框架”啦~,大家感兴趣的可自行Down下源码,这里我附上GitHub的源码地址:>>>>戳我访问<<<<

源码中涵盖了整个业务系统和分布式事务框架的完整实现,但为了快速搭建,所以对于微服务项目的架构并不全面,如RPC框架用的HttpClientGateWay网关也没有,限流熔断也没做,注册中心依旧用的是Eureka....,还是那句话,大家有兴趣可自行完善~

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1天前
|
SQL NoSQL 数据库
SpringCloud基础6——分布式事务,Seata
分布式事务、ACID原则、CAP定理、Seata、Seata的四种分布式方案:XA、AT、TCC、SAGA模式
SpringCloud基础6——分布式事务,Seata
|
4天前
|
运维 NoSQL Java
SpringBoot接入轻量级分布式日志框架GrayLog技术分享
在当今的软件开发环境中,日志管理扮演着至关重要的角色,尤其是在微服务架构下,分布式日志的统一收集、分析和展示成为了开发者和运维人员必须面对的问题。GrayLog作为一个轻量级的分布式日志框架,以其简洁、高效和易部署的特性,逐渐受到广大开发者的青睐。本文将详细介绍如何在SpringBoot项目中接入GrayLog,以实现日志的集中管理和分析。
33 1
|
16天前
|
数据采集 分布式计算 并行计算
Dask与Pandas:无缝迁移至分布式数据框架
【8月更文第29天】Pandas 是 Python 社区中最受欢迎的数据分析库之一,它提供了高效且易于使用的数据结构,如 DataFrame 和 Series,以及大量的数据分析功能。然而,随着数据集规模的增大,单机上的 Pandas 开始显现出性能瓶颈。这时,Dask 就成为了一个很好的解决方案,它能够利用多核 CPU 和多台机器进行分布式计算,从而有效地处理大规模数据集。
42 1
|
11天前
|
分布式计算 资源调度 Hadoop
在YARN集群上运行部署MapReduce分布式计算框架
主要介绍了如何在YARN集群上配置和运行MapReduce分布式计算框架,包括准备数据、运行MapReduce任务、查看任务日志,并启动HistoryServer服务以便于日志查看。
28 0
|
1月前
|
关系型数据库 MySQL 数据库
SpringCloud2023中使用Seata解决分布式事务
对于分布式系统而言,需要保证分布式系统中的数据一致性,保证数据在子系统中始终保持一致,避免业务出现问题。分布式系统中对数据的操作要么一起成功,要么一起失败,必须是一个整体性的事务。Seata简化了这个使用过程。
40 2
|
14天前
|
缓存 分布式计算 Java
详细解读MapReduce框架中的分布式缓存
【8月更文挑战第31天】
11 0
|
1月前
|
Go API 数据库
[go 面试] 分布式事务框架选择与实践
[go 面试] 分布式事务框架选择与实践
|
19天前
|
机器学习/深度学习 编译器 PyTorch
自研分布式训练框架EPL问题之吸引社区参与共建如何解决
自研分布式训练框架EPL问题之吸引社区参与共建如何解决
|
19天前
|
并行计算 算法 调度
自研分布式训练框架EPL问题之提高GPU利用率如何解决
自研分布式训练框架EPL问题之提高GPU利用率如何解决
|
19天前
|
算法 异构计算
自研分布式训练框架EPL问题之帮助加速Bert Large模型的训练如何解决
自研分布式训练框架EPL问题之帮助加速Bert Large模型的训练如何解决

热门文章

最新文章