分布式事物-全面详解(学习总结---从入门到深化)(4)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 本案例使用Atomikos框架实现XA强一致性分布式事务,模拟跨库转账的业务场景。不同账户之间的转账操作通过同一个项目程序完成。

XA强一致性分布式事务实战_业务说明

场景介绍

本案例使用Atomikos框架实现XA强一致性分布式事务,模拟跨库转账的业务场景。不同账户之间的转账操作通过同一个项目程序完成。

2345_image_file_copy_370.jpg

说明:

转账服务不会直接连接数据库进行转账操作,而是通过 Atomikos框架对数据库连接进行封装,通过Atomikos框架操作 不同的数据库。由于Atomikos框架内部实现了XA分布式事务协 议,因此转账服务的逻辑处理不用关心分布式事务是如何实现的,只需要关注具体的业务逻辑。

框架选型

2345_image_file_copy_371.jpg

user_account数据表结构

2345_image_file_copy_372.jpg 

设计完数据表后,在192.168.66.100服务器创建2个数据库,分别 为tx-xa-01和tx-xa-02,分别在2个数据库中创建转出金额数据库。

DROP TABLE IF EXISTS `user_account`;
CREATE TABLE `user_account`  (
  `account_no` varchar(64) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '账户编号'
,
  `account_name` varchar(64) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账户
名称'
,
  `account_balance` decimal(10, 2) NULL DEFAULT
NULL COMMENT '账户余额'
,
  PRIMARY KEY (`account_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE
= utf8_bin ROW_FORMAT = Dynamic;

添加数据

tx-xa-01库中添加数据。

INSERT INTO `user_account` VALUES ('1001','张三', 10000.00);

tx-xa-02库中添加数据。

INSERT INTO `user_account` VALUES ('1002','李四', 10000.00);

XA强一致性分布式事务实战_项目搭建

创建atomikos-xa项目

2345_image_file_copy_373.jpg

创建依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>
        <!-- druid连接池依赖组件-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.22</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

编写配置文件

server:
 port: 6003
spring:
 autoconfigure:
    #停用druid连接池的自动配置
   exclude:
       com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
 datasource:
    #选用druid的XADataSource数据源,因为这个数据源支持分布式事务管理
   type: com.alibaba.druid.pool.xa.DruidXADataSource
    #以下是自定义字段
   dynamic:
     primary: master
     datasource:
       master:
         url:
jdbc:mysql://192.168.66.102:3306/tx-xa-01?
useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false&zeroDateT
imeBehavior=convertToNull&serverTimezone=Asia/S
hanghai&autoReconnect=true
         username: root
         password01: 123456
         driver-class-name: com.mysql.jdbc.Driver
       slave:
         url:
jdbc:mysql://192.168.66.102:3306/tx-xa-02?
useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false&zeroDateT
imeBehavior=convertToNull&serverTimezone=Asia/S
hanghai&autoReconnect=true
         username: root
         password01: 123456
         driver-class-name: com.mysql.jdbc.Driver
       validation-query: SELCET 1

编写主启动类

@Slf4j
@SpringBootApplication
@EnableTransactionManagement(proxyTargetClass = true)
public class TxXaStarter {
    public static void main(String[] args){
        SpringApplication.run(TxXaStarter.class,args);
        log.info("*************** TxXaStarter*********");
   }
}

XA强一致性分布式事务实战_多数据源实现

2345_image_file_copy_374.jpg

创建第一个数据源的配置类DBConfig1

@Data
@ConfigurationProperties(prefix = "spring.datasource.dynamic.datasource.master")
public class DBConfig1 {
    private String url;
    private String username;
    private String password;
    private String dataSourceClassName;
}

创建第二个数据源的配置类DBConfig2

@Data
@ConfigurationProperties(prefix = "spring.datasource.dynamic.datasource.slave")
public class DBConfig2 {
    private String url;
    private String username;
    private String password;
    private String dataSourceClassName;
}

创建持久层接口

分别在com.itbaizhan.mapper1包和com.itbaizhan.mapper2包下创建UserAccount1Mapper接口和UserAccount2Mapper接口。

public interface UserAccount1Mapper extends BaseMapper<UserAccount> {}
public interface UserAccount2Mapper extends BaseMapper<UserAccount> {}

创建MyBatisConfig1类

MyBatisConfig1类的作用是整合Atomikos框架,读取DBConfig1类 中的信息,实现数据库连接池,最终通过Atomikos框架的数据库连接池连接数据库并操作。

@Configuration
@MapperScan(basePackages = "com.itbaizhan.mapper1", sqlSessionTemplateRef = "masterSqlSessionTemplate")
public class MyBatisConfig1 {
    @Bean(name = "masterDataSource")
    public DataSource masterDataSource(DBConfig1 dbConfig1) {
        AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
        sourceBean.setUniqueResourceName("masterDataSource");
        sourceBean.setXaDataSourceClassName(dbConfig1.getDataSourceClassName());
        sourceBean.setTestQuery("select 1");
        sourceBean.setBorrowConnectionTimeout(3);
        MysqlXADataSource dataSource = new MysqlXADataSource();
        dataSource.setUser(dbConfig1.getUsername());
        dataSource.setPassword(dbConfig1.getPassword());
        dataSource.setUrl(dbConfig1.getUrl());
        sourceBean.setXaDataSource(dataSource);
        return sourceBean;
   }
    @Bean(name = "masterSqlSessionFactory")
 public SqlSessionFactory masterSqlSessionFactory(@Qualifier("masterDataSource") DataSource dataSource) throws Exception
{
        MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSource);
        return sqlSessionFactoryBean.getObject();
   }
    @Bean(name = "masterSqlSessionTemplate")
    public SqlSessionTemplate masterSqlSessionTemplate(@Qualifier("masterSqlSessionFactory") SqlSessionFactory
sqlSessionFactory){
        return new SqlSessionTemplate(sqlSessionFactory);
   }
}

创建MyBatisConfig2类

MyBatisConfig2类的作用与MyBatisConfig1类的作用相似,只不过 MyBatisConfig2类读取的是DBConfig2类中的信息,封装的是整合 了Atomikos框架的另一个数据源的数据库连接池,通过连接池连接数据库并操作。

@Configuration
@MapperScan(basePackages = "com.itbaizhan.mapper2", sqlSessionTemplateRef = "slaveSqlSessionTemplate")
public class MyBatisConfig2 {
    @Bean(name = "slaveDataSource")
    public DataSource slaveDataSource(DBConfig2 dbConfig2) {
        AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
        sourceBean.setUniqueResourceName("slaveDataSource");
        sourceBean.setXaDataSourceClassName(dbConfig2.getDataSourceClassName());
        sourceBean.setTestQuery("select 1");
        sourceBean.setBorrowConnectionTimeout(3);
        MysqlXADataSource dataSource = new MysqlXADataSource();
        dataSource.setUser(dbConfig2.getUsername());
        dataSource.setPassword(dbConfig2.getPassword());
        dataSource.setUrl(dbConfig2.getUrl());
        sourceBean.setXaDataSource(dataSource);
        return sourceBean;
   }
    @Bean(name = "slaveSqlSessionFactory")
  public SqlSessionFactory slaveSqlSessionFactory(@Qualifier("slaveDataSource") DataSource dataSource) throws Exception {
        MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSource);
        return sqlSessionFactoryBean.getObject();
   }
    @Bean(name = "slaveSqlSessionTemplate")
    public SqlSessionTemplate slaveSqlSessionTemplate(@Qualifier("slaveSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
        return new SqlSessionTemplate(sqlSessionFactory);
   }
}

XA强一致性分布式事务实战_业务层实现

项目的业务逻辑层主要实现具体的跨库转账的业务逻辑,由于具体 的XA跨库分布式事务是由Atomikos框架内部实现的,因此在业务逻 辑层处理跨库转账的逻辑时,就像操作本地数据库一样简单。

创建UserAccount类

@Data
@TableName("user_account")
@AllArgsConstructor
@NoArgsConstructor
public class UserAccount implements
Serializable {
    private static final long serialVersionUID = 6909533252826367496L;
    /**
     * 账户编号
     */
    @TableId
    private String accountNo;
    /**
     * 账户名称
     */
    private String accountName;
    /**
     * 账户余额
     */
    private BigDecimal accountBalance;
}

创建UserAccountService接口

public interface UserAccountService {
      /**
     * 跨库转账
     * @param sourceAccountNo 转出账户
     * @param targetSourceNo 转入账户
     * @param bigDecimal 金额
     */
   void transferAccounts(String sourceAccountNo, String targetSourceNo,BigDecimal transferAmount);
}

实现UserAccountService接口

package com.itbaizhan.service.impl;
import com.itbaizhan.entity.UserAccount;
import com.itbaizhan.mapper1.UserAccountMapper1;
import com.itbaizhan.mapper2.UserAccountMapper2;
import com.itbaizhan.service.IUserAccountService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
/**
* <p>
* 服务实现类
* </p>
*
* @author itbaizhan
* @since 05-13
*/
@Service
public class UserAccountServiceImpl  implements IUserAccountService {
    @Autowired
    private UserAccountMapper1 userAccountMapper1;
    @Autowired
    private UserAccountMapper2 userAccountMapper2;
    /**
     * 跨库转账
     * @param sourceAccountNo 源账户
     * @param targetSourceNo 目标账户
     * @param bigDecimal 金额
     */
    @Transactional
    @Override
  public void transofer(String sourceAccountNo, String targetSourceNo, BigDecimal bigDecimal) {
        // 1. 查询原账户
        UserAccount sourceUserAccount = userAccountMapper1.selectById(sourceAccountNo);
        // 2. 查询目标账户
        UserAccount targetUserAccount = userAccountMapper2.selectById(targetSourceNo);
        // 3. 判断转入账户和转出账户是否为空
        if (sourceAccountNo != null && targetUserAccount != null){
            // 4. 判断转出账户是否余额不足
            if (sourceUserAccount.getAccountBalance().compareTo(bigDecimal) < 0){
                throw  new RuntimeException("余额不足");
           }
            // 5.更新金额
          sourceUserAccount.setAccountBalance(sourceUserAccount.getAccountBalance().subtract(bigDecimal));
            // 6.张三账户减金额
          userAccountMapper1.updateById(sourceUserAccount);
            System.out.println(10/0);
            // 7.更新金额
          targetUserAccount.setAccountBalance(targetUserAccount.getAccountBalance().add(bigDecimal));
            // 8.张三账户减金额
          userAccountMapper2.updateById(targetUserAccount);
       }
   }
}

分布式架构的理论知识_BASE理论

2345_image_file_copy_375.jpg

为什么会出现BASE理论

CAP 理论表明,对于一个分布式系统而言,它是无法同时满足 Consistency(强一致性)、Availability(可用性) 和 Partition tolerance(分区容忍性) 这三个条件的,最多只能满足其中两个。

2345_image_file_copy_376.jpg

2345_image_file_copy_377.jpg

简介

BASE 理论起源于 2008 年, 由 eBay 的架构师 Dan Pritchett 在 ACM 上发表。

什么是BASE理论

BASE 是 Basically Available(基本可用) 、Soft-state(软状态) 和 Eventually Consistent(最终一致性) 三个短语的缩写。

核心思想:

既是无法做到强一致性(Strong consistency),但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性(Eventual consistency)。

BASE 理论三要素

基本可用(Basically Available)

基本可用是指分布式系统在出现故障的时候,允许损失部分可用性,即保证核心可用。允许损失部分可用性。但是,这绝不等价于 系统不可用。

软状态(Soft State)

软状态是指允许系统存在中间状态,而该中间状态不会影响系统整体可用性。即允许系统在多个不同节点的数据副本存在数据延时。

2345_image_file_copy_378.jpg

注意:

用户在商城下单时,因网络超时等因素,订单处于“支付中”的状 态,待数据最终一致后状态将变更为“关闭”或“成功”状态。

最终一致性(Eventual Consistency)

最终一致性是指系统中的所有数据副本经过一定时间后,最终能够 达到一致的状态。弱一致性和强一致性相反,最终一致性是弱一致性的一种特殊情况。

2345_image_file_copy_379.jpg

总结

ACID 是数据库事务完整性的理论,CAP 是分布式系统设计理论, BASE是 CAP 理论中 AP 方案的延伸。符合Base理论的事务可以称为柔性事务。

分布式事务解决方案_最终一致性分布式事务

2345_image_file_copy_380.jpg

什么是最终一致性事务

强一致性分布式事务解决方案要求参与事务的各个节点的数据时刻 保持一致,查询任意节点的数据都能得到最新的数据结果。这就导 致在分布式场景,尤其是高并发场景下,系统的性能受到影响。而 最终一致性分布式事务解决方案并不要求参与事务的各节点数据时刻保持一致,允许其存在中间状态,只要一段时间后,能够达到数据的最终一致状态即可。

典型方案

为了解决分布式、高并发场景下系统的性能问题,业界基于Base理论提出了最终一致性分布式事务解决方案。

2345_image_file_copy_381.jpg

适用场景

2345_image_file_copy_382.jpg

2345_image_file_copy_383.jpg

优缺点

最终一致性分布式事务解决方案的优点:

2345_image_file_copy_384.jpg

最终一致性分布式事务解决方案的缺点:

2345_image_file_copy_385.jpg

最终一致性分布式事务解决方案_TCC是什么

2345_image_file_copy_386.jpg

概念

TCC(Try-Confirm-Cancel)又称补偿事务。

TCC核心思想

TCC分布式事务最核心的思想就是在应用层将一个完整的事务操作分为三个阶段。在某种程度上讲,TCC是一种资源,实现了Try、 Confirm、Cancel三个操作接口。

2345_image_file_copy_394.jpg

Try阶段

Try阶段是准备执行业务的阶段,在这个阶段尝试执行业务。

2345_image_file_copy_395.jpg

2345_image_file_copy_396.jpg

Confirm阶段

Confirm阶段是确认执行业务的阶段,在这个阶段确认执行的业务。

2345_image_file_copy_397.jpg

Cancel阶段

Cancel阶段取消执行业务。

2345_image_file_copy_398.jpg

TCC核心组成

2345_image_file_copy_399.jpg

Hmily实现TCC分布式事务实战_认识Hmily-TCC

2345_image_file_copy_400.jpg

概述

Hmily是一款高性能,零侵入,金融级分布式事务解决方案,目前 主要提供柔性事务的支持,包含 TCC , TAC (自动生成回滚SQL) 方案, 未来还会支持 XA 等方案。

2345_image_file_copy_401.jpg

2345_image_file_copy_402.jpg

Hmily实现TCC分布式事务实战_业务场景介绍

案例程序分为3个部分

项目公共模块、转出银行微服务和转入银行微服务。转出银行微服 务和转入银行微服务引用项目的公共模块,转出银行微服务作为 TCC分布式事务中的事务发起方,转入银行微服务作为TCC分布式事 务中的事务被动方。

2345_image_file_copy_403.jpg

框架选择

2345_image_file_copy_404.jpg

数据库表设计

在模拟跨行转账的业务场景中,核心服务包括转出银行微服务和转入银行微服务,对应的数据库包括转出银行数据库和转入银行数据库。

user_account账户数据表

字段名称 字段类型 字段名称
account_no varchar(64) 账户编号
account_name varchar(64) 账户名称
account_balance decimal(10,2) 账户余额
fransfer_amount decimal(10,2) 转账金额,用于锁定资源

try_log记录表

2345_image_file_copy_405.jpg

confirm_log记录表

2345_image_file_copy_406.jpg

cancel_log记录表

2345_image_file_copy_407.jpg

接下来,在192.168.66.100服务器的MySQL命令行执行如下命令创建转出银行数据库和数据表。

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for cancel_log
-- ----------------------------
DROP TABLE IF EXISTS `cancel_log`;
CREATE TABLE `cancel_log`  (
  `tx_no` varchar(64) CHARACTER SET utf8mb4
COLLATE utf8mb4_general_ci NOT NULL COMMENT '全局事务编号',
`create_time` datetime(0) NULL DEFAULT NULL
COMMENT '创建时间',
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4
COLLATE = utf8mb4_general_ci COMMENT = 'Cancel
阶段执行的日志记录' ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of cancel_log
-- ----------------------------
-- ----------------------------
-- Table structure for confirm_log
-- ----------------------------
DROP TABLE IF EXISTS `confirm_log`;
CREATE TABLE `confirm_log`  (
  `tx_no` varchar(64) CHARACTER SET utf8mb4
COLLATE utf8mb4_general_ci NOT NULL COMMENT '全局事务编号',
  `create_time` datetime(0) NULL DEFAULT NULL
COMMENT '创建时间',
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4
COLLATE = utf8mb4_general_ci COMMENT = 'Confirm
阶段执行的日志记录' ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of confirm_log
-- ----------------------------
-- ----------------------------
-- Table structure for try_log
-- ----------------------------
DROP TABLE IF EXISTS `try_log`;
CREATE TABLE `try_log`  (
  `tx_no` varchar(64) CHARACTER SET utf8mb4
COLLATE utf8mb4_general_ci NOT NULL COMMENT '全局事务编号',
  `create_time` datetime(0) NULL DEFAULT
CURRENT_TIMESTAMP(0) COMMENT '创建时间'
,
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4
COLLATE = utf8mb4_general_ci COMMENT = 'Try阶段
执行的日志记录' ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of try_log
-- ----------------------------
-- ----------------------------
-- Table structure for user_account
-- ----------------------------
DROP TABLE IF EXISTS `user_account`;
CREATE TABLE `user_account`  (
  `account_no` varchar(64) CHARACTER SET
utf8mb4 COLLATE utf8mb4_general_ci NOT NULL
COMMENT '账户编号',
  `account_name` varchar(50) CHARACTER SET
utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT
NULL COMMENT '账户名称',
  `account_balance` decimal(10, 2) NULL DEFAULT
NULL COMMENT '账户余额',
  `transfer_amount` decimal(10, 2) NULL DEFAULT
NULL COMMENT '转账金额',
PRIMARY KEY (`account_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4
COLLATE = utf8mb4_general_ci COMMENT = '账户信
息' ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of user_account
-- ----------------------------
INSERT INTO `user_account` VALUES ('1001',
'张三', 10000.00, 0.00);
SET FOREIGN_KEY_CHECKS = 1;

在192.168.66.100服务器的MySQL命令行执行如下命令创建转入银行数据库和数据表。

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for cancel_log
-- ----------------------------
DROP TABLE IF EXISTS `cancel_log`;
CREATE TABLE `cancel_log`  (
  `tx_no` varchar(64) CHARACTER SET utf8mb4
COLLATE utf8mb4_general_ci NOT NULL COMMENT '全局事务编号',
  `create_time` datetime(0) NULL DEFAULT NULL
COMMENT '创建时间',
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4
COLLATE = utf8mb4_general_ci COMMENT = 'Cancel阶段执行的日志记录' ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of cancel_log
-- ----------------------------
-- ----------------------------
-- Table structure for confirm_log
-- ----------------------------
DROP TABLE IF EXISTS `confirm_log`;
CREATE TABLE `confirm_log`  (
  `tx_no` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '全局事务编号',
  `create_time` datetime(0) NULL DEFAULT NULL
COMMENT '创建时间',
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4
COLLATE = utf8mb4_general_ci COMMENT = 'Confirm
阶段执行的日志记录' ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of confirm_log
-- ----------------------------
-- ----------------------------
-- Table structure for try_log
-- ----------------------------
DROP TABLE IF EXISTS `try_log`;
CREATE TABLE `try_log`  (
`tx_no` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '全
局事务编号',
  `create_time` datetime(0) NULL DEFAULT NULL
COMMENT '创建时间',
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4
COLLATE = utf8mb4_general_ci COMMENT = 'Try阶段
执行的日志记录' ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of try_log
-- ----------------------------
-- ----------------------------
-- Table structure for user_account
-- ----------------------------
DROP TABLE IF EXISTS `user_account`;
CREATE TABLE `user_account`  (
  `account_no` varchar(64) CHARACTER SET
utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '账户编号',
  `account_name` varchar(50) CHARACTER SET
utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT
NULL COMMENT '账户名称',
  `account_balance` decimal(10, 2) NULL DEFAULT
NULL COMMENT '账户余额',
  `transfer_amount` decimal(10, 2) NULL DEFAULT
NULL COMMENT '转账金额',
  PRIMARY KEY (`account_no`) USING BTREE) ENGINE = InnoDB CHARACTER SET = utf8mb4
COLLATE = utf8mb4_general_ci COMMENT = '账户信息' ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of user_account
-- ----------------------------
INSERT INTO `user_account` VALUES ('1002','李四', 10000.00, 0.00);
SET FOREIGN_KEY_CHECKS = 1;

Hmily实现TCC分布式事务_项目搭建

创建父工程tx-tcc

2345_image_file_copy_408.jpg

设置逻辑工程

<packaging>pom</packaging>

创建公共模块

2345_image_file_copy_409.jpg

创建转出银行微服务

2345_image_file_copy_410.jpg

创建传入银行微服务

2345_image_file_copy_411.jpg

公共模块引入依赖

   <dependencies>
        <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

Hmily实现TCC分布式事务实战_公共模块

2345_image_file_copy_412.jpg

持久层的实现

项目公共模块的持久层是转出银行微服务和转入银行微服务共用 的,在逻辑上既实现了转出金额的处理,又实现了转入金额的处理,同时还实现了TCC分布式事务每个阶段执行记录的保存和查询 操作。

@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserAccountDto implements Serializable {
    private static final long serialVersionUID = 3361105512695088121L;
    /**
     * 自定义事务编号
     */
    private String txNo;
    /**
     * 转出账户
     */
    private String sourceAccountNo;
    /**
     * 转入账户
     */
    private String targetAccountNo;
    /**
     * 金额
     */
    private BigDecimal amount;
}

Dubbo接口的定义

在整个项目的实现过程中,转出银行微服务和转入银行微服务之间 是通过Dubbo实现远程接口调用。因为项目中定义的Dubbo接口需要被转出银行微服务和转入银行微服务同时引用,所以需要将 Dubbo接口放在项目的公共模块。

public interface UserAccountBank02Service {
    /**
     * 转账
     */
    void transferAmountToBank2(UserAccountDto userAccountDto);
}

Hmily实现TCC分布式事务_集成Dubbo框架

2345_image_file_copy_413.jpg 

转入银行微服务对外提供了转入账户的Dubbo接口,当转出银行微 服务调用转入银行微服务的Dubbo接口时,转入银行微服务会执行增加账户余额的操作。

引入Dubbo依赖

       <!-- dubbo依赖 -->
        <dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo-spring-boot-starter</artifactId>
            <version>2.7.15</version>
        </dependency>
        <!--ZooKeeper客户端-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.2.1</version>
        </dependency>
        <dependency>
           <groupId>org.apache.curator</groupId>
           <artifactId>curator-recipes</artifactId>
            <version>5.2.1</version>
        </dependency>

转入银行微服务编写application.yml

server:
 port: 6005
spring:
 datasource:
   driver-class-name: com.mysql.cj.jdbc.Driver
   url: jdbc:mysql://192.168.66.100:3306/tx-tcc-bank02?
useUnicode=true&characterEncoding=utf8
   username: root
   password01: 123456
dubbo:
 scan:
   base-packages: com.itbaizhan.service
 application:
   name: tx-tcc-bank02
 registry:
   address: zookeeper://localhost:2181
 protocol:
   name: dubbo
   port: 12345

转出银行微服务编写application.yml

server:
 port: 6004
spring:
 datasource:
   driver-class-name: com.mysql.cj.jdbc.Driver
   url: jdbc:mysql://192.168.66.100:3306/tx-tcc-bank02?
useUnicode=true&characterEncoding=utf8
   username: root
   password01: 123456
dubbo:
 scan:
   base-packages: com.itbaizhan.service
 application:
   name: tx-tcc-bank01
 registry:
   address: zookeeper://localhost:2181

编写转入微服务主启动类

@MapperScan("com.itbaizhan.mapper")
@SpringBootApplication
@Slf4j
public class Bank2Main6005 {
    public static void main(String[] args) {
       SpringApplication.run(Bank2Main6005.class,args);
        log.info("************ Bank2Main6005 启动成功 **********");
   }
}

编写转出微服务主启动类

@MapperScan("com.itbaizhan.mapper")
@SpringBootApplication
@Slf4j
public class Bank1Main6004 {
    public static void main(String[] args) {
        SpringApplication.run(Bank1Main6004.class,args);
        log.info("*********** Bank1Main6004*******");
   }
}

业务逻辑层的实现

转出银行微服务的业务逻辑层主要是实现本地账户的金额扣减操作,通过Dubbo框架实现转入银行微服务对应账户余额的增加操作。

转入微服务实现转账功能

@DubboService(version = "1.0.0")
public class UserAccountServicelmpl implements UserAccountBank02Service {
    @Autowired
    private UserAccountMapper userAccountMapper;
    @Override
    public void transferAmountToBank02(UserAccountDTO userAccountDTO) {
        // 1. 根据账户编号查询账户信息
        UserAccount userAccount = userAccountMapper.selectById(userAccountDTO);
        // 2. 判断账户是否存在
        if (userAccount != null ){
            userAccount.setAccountBalance(userAccount.getAccountBalance().add(userAccountDTO.getBigDecimal()));
         // 3. 更新账户
          userAccountMapper.updateById(userAccount);
       }
   }
}

转出微服务实现转账功能

编写转账接口

    /**
     * 跨库转账
     * @param userAccountDTO
     */
    void transferAmountToBank02(UserAccountDTO userAccountDTO);

转出微服务转账接口实现

@Service
public class UserAccountServiceImpl  implements IUserAccountService {
    @DubboReference(version = "1.0.0")
    private UserAccountBank02Service userAccountBank02Service;
    @Autowired
     private UserAccountMapper userAccountMapper;
    @Override
    public void transferAmountToBank02(UserAccountDTO userAccountDTO) {
        // 1. 根据账户编号查询账户信息
        UserAccount userAccount = userAccountMapper.selectById(userAccountDTO);
        // 2. 判断账户是否存在
        if (userAccount != null && userAccount.getAccountBalance().compareTo(userAccountDTO.getBigDecimal()) > 0){
          userAccount.setAccountBalance(userAccount.getAccountBalance().subtract(userAccountDTO.getBigDecimal()));
            // 3. 更新账户
          userAccountMapper.updateById(userAccount);
       }
        // 4.远程调用转入微服务账户增加金额
        userAccountBank02Service.transferAmountToBank02(userAccountDTO);
   }
}

转出微服务编写控制层

@RestController
@RequestMapping("/userAccount")
public class UserAccountController {
    @Autowired
    private IUserAccountService iUserAccountService;
    /**
     * 转账
     * @return
     */
    @GetMapping("/transfer")
    public String transfer(UserAccountDTO userAccountDTO){
        iUserAccountService.transferAmountToBank02(userAccountDTO);
        return "转账成功";
   }
}

Hmily实现TCC事务_集成Hmily框架

2345_image_file_copy_414.jpg

转入和转出微服务引入依赖

         <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>hmily-spring-boot-starter-apache-dubbo</artifactId>
            <version>2.1.1</version>
            <exclusions>
               <exclusion>
                    <groupId>org.dromara</groupId>
                    <artifactId>hmily-repository-mongodb</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

转入微服务编写hmily配置文件

在项目的 resource 新建文件名为: hmily.yml 配置文件

hmily:
 server:
   configMode: local
   appName: user-account-bank02-dubbo
  # 如果server.configMode eq local 的时候才会读取到这里的配置信息.
 config:
   appName: user-account-bank01-dubbo
   serializer: kryo
   contextTransmittalMode: threadLocal
   scheduledThreadMax: 16
   scheduledRecoveryDelay: 60
   scheduledCleanDelay: 60
   scheduledPhyDeletedDelay: 600
   scheduledInitDelay: 30
   recoverDelayTime: 60
   cleanDelayTime: 180
   limit: 200
   retryMax: 10
   bufferSize: 8192
   consumerThreads: 16
   asyncRepository: true
   autoSql: true
   phyDeleted: true
   storeDays: 3
   repository: mysql
repository:
 database:
   driverClassName: com.mysql.cj.jdbc.Driver
   url :
jdbc:mysql://192.168.66.100:3306/hmily?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false&serv
erTimezone=UTC
   username: root
   password01: 123456
   maxActive: 20
   minIdle: 10
   connectionTimeout: 30000
   idleTimeout: 600000
   maxLifetime: 1800000

转出微服务编写hmily配置文件

在项目的 resource 新建文件名为: hmily.yml 配置文件

hmily:
 server:
   configMode: local
   appName: user-account-bank01-dubbo
  # 如果server.configMode eq local 的时候才会读取到这里的配置信息.
 config:
   appName: user-account-bank01-dubbo
   serializer: kryo
   contextTransmittalMode: threadLocal
   scheduledThreadMax: 16
   scheduledRecoveryDelay: 60
   scheduledCleanDelay: 60
   scheduledPhyDeletedDelay: 600
   scheduledInitDelay: 30
   recoverDelayTime: 60
   cleanDelayTime: 180
   limit: 200
   retryMax: 10
   bufferSize: 8192
   consumerThreads: 16
   asyncRepository: true
   autoSql: true
   phyDeleted: true
   storeDays: 3
   repository: mysql
repository:
 database:
   driverClassName: com.mysql.cj.jdbc.Driver
   url :
jdbc:mysql://192.168.66.100:3306/hmily?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false&serv
erTimezone=UTC
   username: root
   password: 123456
   maxActive: 20
   minIdle: 10
   connectionTimeout: 30000
   idleTimeout: 600000
   maxLifetime: 1800000

实现接口上添加注解

TCC模式

2345_image_file_copy_415.jpg

Hmily实现TCC分布式事务_转入转出微服务实现Try阶段

2345_image_file_copy_416.jpg

转出微服务Try阶段

    /**
     * 转账功能
     * @param userAccountDTO
     */
   @HmilyTCC(confirmMethod = "sayConfrim", cancelMethod = "sayCancel")
    @Override
    public void transferAmountToBank02(UserAccountDTO userAccountDTO) {
        String txNo = userAccountDTO.getTxNo();
        log.info("**********   执行bank01 的 Try 方法 ,事务id={}",txNo);
        // 1、 幂等处理
        TryLog tryLog = tryLogMapper.selectById(txNo);
        if (tryLog != null){
            return ;
       }
        // 2、 悬挂处理
        if (confirmLogMapper.selectById(txNo) != null || cancelLogMapper.selectById(txNo) != null){
            return ;
       }
         // 3. 根据账户编号查询账户信息
        UserAccount userAccount = baseMapper.selectById(userAccountDTO.getSourceAccountNo());
        // 4. 判断账户是否存在
        if (userAccount != null){
        // 5. 账户金额更新
            LambdaUpdateWrapper<UserAccount> ulw = new LambdaUpdateWrapper<>();
        // 更新转账金额
          ulw.set(UserAccount::getTransferAmount,userAccount.getTransferAmount().add(userAccountDTO.getBigDecimal()));
            // 更新余额
          ulw.set(UserAccount::getAccountBalance,userAccount.getAccountBalance().subtract(userAccountDTO.getBigDecimal()));
          ulw.eq(UserAccount::getAccountNo,userAccountDTO.getSourceAccountNo());
            baseMapper.update(null,ulw);
       }
        // 7. 准备阶段记录
        TryLog tryLog1 = new TryLog();
        tryLog1.setTxNo(txNo);
        tryLog1.setCreateTime(LocalDateTime.now());
        tryLogMapper.insert(tryLog1);
         // 8. 远程调用 转入微服务 跨库转账的功能
         userAccountBank02Service.transferAmountToBank02(userAccountDTO);
   }

转入微服务Try阶段

 /**
     * 跨库转账
     * @param userAccountDTO
     */
    @HmilyTCC(confirmMethod = "sayConfrim", cancelMethod = "sayCancel")
    @Override
    public void transferAmountToBank02(UserAccountDTO userAccountDTO) {
        String txNo = userAccountDTO.getTxNo();
        log.info("**********   执行bank02 的 Try方法 ,事务id={}",txNo);
        // 1、 幂等处理
        TryLog tryLog = tryLogMapper.selectById(txNo);
        if (tryLog != null){
            return ;
       }
        // 2、 悬挂处理
        if (confirmLogMapper.selectById(txNo) != null || cancelLogMapper.selectById(txNo) != null){
            return ;
       }
        // 3. 根据账户编号查询账户信息
        UserAccount userAccount = userAccountMapper.selectById(userAccountDTO.getTargetAccountNo());
       // 4. 判断账户是否存在
        if (userAccount != null){
            // 5. 账户金额更新
            LambdaUpdateWrapper<UserAccount> ulw = new LambdaUpdateWrapper<>();
            // 更新转账金额
            ulw.set(UserAccount::getTransferAmount,userAccount.getTransferAmount().add(userAccountDTO.getBigDecimal()));
          ulw.eq(UserAccount::getAccountNo,userAccountDTO.getTargetAccountNo());
            userAccountMapper.update(null,ulw);
       }
        // 7. 准备阶段记录
        TryLog tryLog1 = new TryLog();
        tryLog1.setTxNo(txNo);
        tryLog1.setCreateTime(LocalDateTime.now());
        tryLogMapper.insert(tryLog1);
   }

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
5月前
|
存储 SQL 分布式数据库
OceanBase 入门:分布式数据库的基础概念
【8月更文第31天】在当今的大数据时代,随着业务规模的不断扩大,传统的单机数据库已经难以满足高并发、大数据量的应用需求。分布式数据库应运而生,成为解决这一问题的有效方案之一。本文将介绍一款由阿里巴巴集团自主研发的分布式数据库——OceanBase,并通过一些基础概念和实际代码示例来帮助读者理解其工作原理。
516 0
|
7月前
|
机器学习/深度学习 分布式计算 算法
联邦学习是保障数据隐私的分布式机器学习方法
【6月更文挑战第13天】联邦学习是保障数据隐私的分布式机器学习方法,它在不暴露数据的情况下,通过在各设备上本地训练并由中心服务器协调,实现全局模型构建。联邦学习的优势在于保护隐私、提高训练效率和增强模型泛化。已应用于医疗、金融和物联网等领域。未来趋势包括更高效的数据隐私保护、提升可解释性和可靠性,以及与其他技术融合,有望在更多场景发挥潜力,推动机器学习发展。
142 4
|
7月前
|
消息中间件 NoSQL Java
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
272 0
|
3月前
|
消息中间件 关系型数据库 Java
‘分布式事务‘ 圣经:从入门到精通,架构师尼恩最新、最全详解 (50+图文4万字全面总结 )
本文 是 基于尼恩之前写的一篇 分布式事务的文章 升级而来 , 尼恩之前写的 分布式事务的文章, 在全网阅读量 100万次以上 , 被很多培训机构 作为 顶级教程。 此文修改了 老版本的 一个大bug , 大家不要再看老版本啦。
|
4月前
|
Dubbo Java 应用服务中间件
分布式-dubbo的入门
分布式-dubbo的入门
|
4月前
|
机器学习/深度学习 算法 自动驾驶
深度学习之分布式智能体学习
基于深度学习的分布式智能体学习是一种针对多智能体系统的机器学习方法,旨在通过多个智能体协作、分布式决策和学习来解决复杂任务。这种方法特别适用于具有大规模数据、分散计算资源、或需要智能体彼此交互的应用场景。
244 4
|
5月前
|
机器学习/深度学习 并行计算 PyTorch
PyTorch与DistributedDataParallel:分布式训练入门指南
【8月更文第27天】随着深度学习模型变得越来越复杂,单一GPU已经无法满足训练大规模模型的需求。分布式训练成为了加速模型训练的关键技术之一。PyTorch 提供了多种工具来支持分布式训练,其中 DistributedDataParallel (DDP) 是一个非常受欢迎且易用的选择。本文将详细介绍如何使用 PyTorch 的 DDP 模块来进行分布式训练,并通过一个简单的示例来演示其使用方法。
672 2
|
8月前
|
并行计算 算法 物联网
LLM 大模型学习必知必会系列(七):掌握分布式训练与LoRA/LISA微调:打造高性能大模型的秘诀进阶实战指南
LLM 大模型学习必知必会系列(七):掌握分布式训练与LoRA/LISA微调:打造高性能大模型的秘诀进阶实战指南
LLM 大模型学习必知必会系列(七):掌握分布式训练与LoRA/LISA微调:打造高性能大模型的秘诀进阶实战指南
|
7月前
|
存储 搜索推荐 Java
微服务SpringCloud ES分布式全文搜索引擎简介 下载安装及简单操作入门
微服务SpringCloud ES分布式全文搜索引擎简介 下载安装及简单操作入门
99 2
|
7月前
|
负载均衡 NoSQL 关系型数据库
Redis分布式锁学习总结
Redis分布式锁学习总结
42 0

热门文章

最新文章