分布式事物【Hmily实现TCC分布式事务、Hmily实现TCC事务、最终一致性分布式事务解决方案】(七)-全面详解(学习总结---从入门到深化)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 分布式事物【Hmily实现TCC分布式事务、Hmily实现TCC事务、最终一致性分布式事务解决方案】(七)-全面详解(学习总结---从入门到深化)



Hmily实现TCC分布式事务_项目搭建

创建父工程tx-tcc

设置逻辑工程

<packaging>pom</packaging>

创建公共模块

创建转出银行微服务

创建传入银行微服务

公共模块引入依赖

<dependencies>
        <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

Hmily实现TCC分布式事务实战_公共模块

持久层的实现

项目公共模块的持久层是转出银行微服务和转入银行微服务共用 的,在逻辑上既实现了转出金额的处理,又实现了转入金额的处理,同时还实现了TCC分布式事务每个阶段执行记录的保存和查询 操作。

@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserAccountDto implements Serializable {
    private static final long serialVersionUID = 3361105512695088121L;
    /**
     * 自定义事务编号
     */
    private String txNo;
    /**
     * 转出账户
     */
    private String sourceAccountNo;
    /**
     * 转入账户
     */
    private String targetAccountNo;
    /**
     * 金额
     */
    private BigDecimal amount;
}

Dubbo接口的定义

在整个项目的实现过程中,转出银行微服务和转入银行微服务之间 是通过Dubbo实现远程接口调用。因为项目中定义的Dubbo接口需要被转出银行微服务和转入银行微服务同时引用,所以需要将 Dubbo接口放在项目的公共模块。

public interface UserAccountBank02Service {
    /**
     * 转账
     */
    void transferAmountToBank2(UserAccountDto userAccountDto);
}

Hmily实现TCC分布式事务_集成Dubbo框架

转入银行微服务对外提供了转入账户的Dubbo接口,当转出银行微 服务调用转入银行微服务的Dubbo接口时,转入银行微服务会执行增加账户余额的操作。

引入Dubbo依赖

<!-- dubbo依赖 -->
        <dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo-spring-boot-starter</artifactId>
            <version>2.7.15</version>
        </dependency>
        <!--ZooKeeper客户端-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.2.1</version>
        </dependency>
        <dependency>
           <groupId>org.apache.curator</groupId>
           <artifactId>curator-recipes</artifactId>
            <version>5.2.1</version>
        </dependency>

转入银行微服务编写application.yml

server:
 port: 6005
spring:
 datasource:
   driver-class-name: com.mysql.cj.jdbc.Driver
   url: jdbc:mysql://192.168.66.100:3306/tx-tcc-bank02?
useUnicode=true&characterEncoding=utf8
   username: root
   password01: 123456
dubbo:
 scan:
   base-packages: com.itbaizhan.service
 application:
   name: tx-tcc-bank02
 registry:
   address: zookeeper://localhost:2181
 protocol:
   name: dubbo
   port: 12345

转出银行微服务编写application.yml

server:
 port: 6004
spring:
 datasource:
   driver-class-name: com.mysql.cj.jdbc.Driver
   url: jdbc:mysql://192.168.66.100:3306/tx-tcc-bank02?
useUnicode=true&characterEncoding=utf8
   username: root
   password01: 123456
dubbo:
 scan:
   base-packages: com.itbaizhan.service
 application:
   name: tx-tcc-bank01
 registry:
   address: zookeeper://localhost:2181

编写转入微服务主启动类

@MapperScan("com.tong.mapper")
@SpringBootApplication
@Slf4j
public class Bank2Main6005 {
    public static void main(String[] args) {
       SpringApplication.run(Bank2Main6005.class,args);
        log.info("************ Bank2Main6005 启动成功 **********");
   }
}

编写转出微服务主启动类

@MapperScan("com.tong.mapper")
@SpringBootApplication
@Slf4j
public class Bank1Main6004 {
    public static void main(String[] args) {
        SpringApplication.run(Bank1Main6004.class,args);
        log.info("*********** Bank1Main6004*******");
   }
}

业务逻辑层的实现

转出银行微服务的业务逻辑层主要是实现本地账户的金额扣减操作,通过Dubbo框架实现转入银行微服务对应账户余额的增加操作。

转入微服务实现转账功能

@DubboService(version = "1.0.0")
public class UserAccountServicelmpl implements UserAccountBank02Service {
    @Autowired
    private UserAccountMapper userAccountMapper;
    @Override
    public void transferAmountToBank02(UserAccountDTO userAccountDTO) {
        // 1. 根据账户编号查询账户信息
        UserAccount userAccount = userAccountMapper.selectById(userAccountDTO);
        // 2. 判断账户是否存在
        if (userAccount != null ){
            userAccount.setAccountBalance(userAccount.getAccountBalance().add(userAccountDTO.getBigDecimal()));
         // 3. 更新账户
          userAccountMapper.updateById(userAccount);
       }
   }
}

转出微服务实现转账功能

编写转账接口

/**
     * 跨库转账
     * @param userAccountDTO
     */
    void transferAmountToBank02(UserAccountDTO userAccountDTO);

转出微服务转账接口实现

@Service
public class UserAccountServiceImpl  implements IUserAccountService {
    @DubboReference(version = "1.0.0")
    private UserAccountBank02Service userAccountBank02Service;
    @Autowired
     private UserAccountMapper userAccountMapper;
    @Override
    public void transferAmountToBank02(UserAccountDTO userAccountDTO) {
        // 1. 根据账户编号查询账户信息
        UserAccount userAccount = userAccountMapper.selectById(userAccountDTO);
        // 2. 判断账户是否存在
        if (userAccount != null && userAccount.getAccountBalance().compareTo(userAccountDTO.getBigDecimal()) > 0){
          userAccount.setAccountBalance(userAccount.getAccountBalance().subtract(userAccountDTO.getBigDecimal()));
            // 3. 更新账户
          userAccountMapper.updateById(userAccount);
       }
        // 4.远程调用转入微服务账户增加金额
        userAccountBank02Service.transferAmountToBank02(userAccountDTO);
   }
}

转出微服务编写控制层

@RestController
@RequestMapping("/userAccount")
public class UserAccountController {
    @Autowired
    private IUserAccountService iUserAccountService;
    /**
     * 转账
     * @return
     */
    @GetMapping("/transfer")
    public String transfer(UserAccountDTO userAccountDTO){
        iUserAccountService.transferAmountToBank02(userAccountDTO);
        return "转账成功";
   }
}

Hmily实现TCC事务_集成Hmily框架

转入和转出微服务引入依赖

<dependency>
            <groupId>org.dromara</groupId>
            <artifactId>hmily-spring-boot-starter-apache-dubbo</artifactId>
            <version>2.1.1</version>
            <exclusions>
               <exclusion>
                    <groupId>org.dromara</groupId>
                    <artifactId>hmily-repository-mongodb</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

转入微服务编写hmily配置文件

在项目的 resource 新建文件名为: hmily.yml 配置文件

hmily:
 server:
   configMode: local
   appName: user-account-bank02-dubbo
  # 如果server.configMode eq local 的时候才会读取到这里的配置信息.
 config:
   appName: user-account-bank01-dubbo
   serializer: kryo
   contextTransmittalMode: threadLocal
   scheduledThreadMax: 16
   scheduledRecoveryDelay: 60
   scheduledCleanDelay: 60
   scheduledPhyDeletedDelay: 600
   scheduledInitDelay: 30
   recoverDelayTime: 60
   cleanDelayTime: 180
   limit: 200
   retryMax: 10
   bufferSize: 8192
   consumerThreads: 16
   asyncRepository: true
   autoSql: true
   phyDeleted: true
   storeDays: 3
   repository: mysql
repository:
 database:
   driverClassName: com.mysql.cj.jdbc.Driver
   url :
jdbc:mysql://192.168.66.100:3306/hmily?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false&serv
erTimezone=UTC
   username: root
   password01: 123456
   maxActive: 20
   minIdle: 10
   connectionTimeout: 30000
   idleTimeout: 600000
   maxLifetime: 1800000

转出微服务编写hmily配置文件

在项目的 resource 新建文件名为: hmily.yml 配置文件

hmily:
 server:
   configMode: local
   appName: user-account-bank01-dubbo
  # 如果server.configMode eq local 的时候才会读取到这里的配置信息.
 config:
   appName: user-account-bank01-dubbo
   serializer: kryo
   contextTransmittalMode: threadLocal
   scheduledThreadMax: 16
   scheduledRecoveryDelay: 60
   scheduledCleanDelay: 60
   scheduledPhyDeletedDelay: 600
   scheduledInitDelay: 30
   recoverDelayTime: 60
   cleanDelayTime: 180
   limit: 200
   retryMax: 10
   bufferSize: 8192
   consumerThreads: 16
   asyncRepository: true
   autoSql: true
   phyDeleted: true
   storeDays: 3
   repository: mysql
repository:
 database:
   driverClassName: com.mysql.cj.jdbc.Driver
   url :
jdbc:mysql://192.168.66.100:3306/hmily?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false&serv
erTimezone=UTC
   username: root
   password: 123456
   maxActive: 20
   minIdle: 10
   connectionTimeout: 30000
   idleTimeout: 600000
   maxLifetime: 1800000

实现接口上添加注解

TCC模式

Hmily实现TCC分布式事务_转入转出微服务实现Try阶段

转出微服务Try阶段

/**
     * 转账功能
     * @param userAccountDTO
     */
   @HmilyTCC(confirmMethod = "sayConfrim", cancelMethod = "sayCancel")
    @Override
    public void transferAmountToBank02(UserAccountDTO userAccountDTO) {
        String txNo = userAccountDTO.getTxNo();
        log.info("**********   执行bank01 的 Try 方法 ,事务id={}",txNo);
        // 1、 幂等处理
        TryLog tryLog = tryLogMapper.selectById(txNo);
        if (tryLog != null){
            return ;
       }
        // 2、 悬挂处理
        if (confirmLogMapper.selectById(txNo) != null || cancelLogMapper.selectById(txNo) != null){
            return ;
       }
         // 3. 根据账户编号查询账户信息
        UserAccount userAccount = baseMapper.selectById(userAccountDTO.getSourceAccountNo());
        // 4. 判断账户是否存在
        if (userAccount != null){
        // 5. 账户金额更新
            LambdaUpdateWrapper<UserAccount> ulw = new LambdaUpdateWrapper<>();
        // 更新转账金额
          ulw.set(UserAccount::getTransferAmount,userAccount.getTransferAmount().add(userAccountDTO.getBigDecimal()));
            // 更新余额
          ulw.set(UserAccount::getAccountBalance,userAccount.getAccountBalance().subtract(userAccountDTO.getBigDecimal()));
          ulw.eq(UserAccount::getAccountNo,userAccountDTO.getSourceAccountNo());
            baseMapper.update(null,ulw);
       }
        // 7. 准备阶段记录
        TryLog tryLog1 = new TryLog();
        tryLog1.setTxNo(txNo);
        tryLog1.setCreateTime(LocalDateTime.now());
        tryLogMapper.insert(tryLog1);
         // 8. 远程调用 转入微服务 跨库转账的功能
         userAccountBank02Service.transferAmountToBank02(userAccountDTO);
   }

转入微服务Try阶段

/**
     * 跨库转账
     * @param userAccountDTO
     */
    @HmilyTCC(confirmMethod = "sayConfrim", cancelMethod = "sayCancel")
    @Override
    public void transferAmountToBank02(UserAccountDTO userAccountDTO) {
        String txNo = userAccountDTO.getTxNo();
        log.info("**********   执行bank02 的 Try方法 ,事务id={}",txNo);
        // 1、 幂等处理
        TryLog tryLog = tryLogMapper.selectById(txNo);
        if (tryLog != null){
            return ;
       }
        // 2、 悬挂处理
        if (confirmLogMapper.selectById(txNo) != null || cancelLogMapper.selectById(txNo) != null){
            return ;
       }
        // 3. 根据账户编号查询账户信息
        UserAccount userAccount = userAccountMapper.selectById(userAccountDTO.getTargetAccountNo());
       // 4. 判断账户是否存在
        if (userAccount != null){
            // 5. 账户金额更新
            LambdaUpdateWrapper<UserAccount> ulw = new LambdaUpdateWrapper<>();
            // 更新转账金额
            ulw.set(UserAccount::getTransferAmount,userAccount.getTransferAmount().add(userAccountDTO.getBigDecimal()));
          ulw.eq(UserAccount::getAccountNo,userAccountDTO.getTargetAccountNo());
            userAccountMapper.update(null,ulw);
       }
        // 7. 准备阶段记录
        TryLog tryLog1 = new TryLog();
        tryLog1.setTxNo(txNo);
        tryLog1.setCreateTime(LocalDateTime.now());
        tryLogMapper.insert(tryLog1);
   }

Hmily实现TCC事务_转入转出微服务实现Confirm阶段

编写转出微服务Confirm阶段

/**
     * 确认阶段
     * @param userAccountDTO
     */
    public void sayConfrim(UserAccountDTO userAccountDTO) {
        String txNo = userAccountDTO.getTxNo();
        log.info("**********   执行bank01 的 Confrim方法 ,事务id={}",txNo);
        // 1、幂等处理
        ConfirmLog confirmLog = confirmLogMapper.selectById(txNo);
        if (confirmLog != null){
            return ;
       }
        // 2、根据账户id查询账户
        UserAccount userAccount = baseMapper.selectById(userAccountDTO.getSourceAccountNo());
        userAccount.setTransferAmount(userAccount.getTransferAmount().subtract(userAccountDTO.getBigDecimal()));
        baseMapper.updateById(userAccount);
        // 3、 确认日志记录
        ConfirmLog confirmLog1 = new ConfirmLog();
        confirmLog1.setTxNo(userAccountDTO.getTxNo());
        confirmLog1.setCreateTime(LocalDateTime.now());
        confirmLogMapper.insert(confirmLog1);
   }

编写转入微服务Confirm阶段

/**
     * 确认阶段
     * @param userAccountDTO
     */
    public void sayConfrim(UserAccountDTO userAccountDTO) {
        String txNo = userAccountDTO.getTxNo();
        log.info("**********   执行bank02 的Confrim方法 ,事务id={}",txNo);
        // 1、幂等处理
        ConfirmLog confirmLog = confirmLogMapper.selectById(txNo);
        if (confirmLog != null) {
            return;
       }
        // 2、根据账户id查询账户
        UserAccount userAccount = userAccountMapper.selectById(userAccountDTO.getTargetAccountNo());
        userAccount.setAccountBalance(userAccount.getAccountBalance().add(userAccountDTO.getBigDecimal()));
      userAccount.setTransferAmount(userAccount.getTransferAmount().subtract(userAccountDTO.getBigDecimal()));
      userAccountMapper.updateById(userAccount);
        // 3、 确认日志记录
        ConfirmLog confirmLog1 = new ConfirmLog();
        confirmLog1.setTxNo(userAccountDTO.getTxNo());
        confirmLog1.setCreateTime(LocalDateTime.now());
        confirmLogMapper.insert(confirmLog1);
   }

Hmily实现TCC分布式事务_转入转出微服务实现Cancel阶段

转入微服务Cananl阶段

/**
     * 回滚
     * @param userAccountDto
     */
    @Transactional(rollbackFor = Exception.class)
    public void cancelMethod(UserAccountDto userAccountDto){
        String txNo = userAccountDto.getTxNo();
        log.info("执行bank02的cancel方法,事务id: {}, 参数为:{}",txNo,JSONObject.toJSONString(userAccountDto));
        CancelLog cancelLog = iCancelLogService.findByTxNo(txNo);
        if(cancelLog != null){
            log.info("bank02已经执行过Cancel方法,txNo:{}", txNo);
            return;
       }
        // 保存记录
       iCancelLogService.saveCancelLog(txNo);
       userAccountMapper.cancelUserAccountBalanceBank02(userAccountDto.getAmount(),
       userAccountDto.getTargetAccountNo());
   }

转出微服务Cancel阶段

/**
     * 取消阶段
     * @param userAccountDTO
     */
    public void sayCancel(UserAccountDTO userAccountDTO) {
        String txNo = userAccountDTO.getTxNo();
        log.info("**********   执行bank01 的 Cancel方法 ,事务id={}",txNo);
        // 1. 幂等处理
        CancelLog cancelLog = cancelLogMapper.selectById(txNo);
        if (cancelLog != null ){
            return;
       }
        // 2、根据账户id查询账户
        UserAccount userAccount = baseMapper.selectById(userAccountDTO.getSourceAccountNo());
        userAccount.setAccountBalance(userAccount.getAccountBalance().add(userAccountDTO.getBigDecimal()));
        userAccount.setTransferAmount(userAccount.getTransferAmount().subtract(userAccountDTO.getBigDecimal()));
        baseMapper.updateById(userAccount);
        // 3、记录回滚日志
        CancelLog cancelLog1 = new CancelLog();
        cancelLog1.setTxNo(txNo);
        cancelLog1.setCreateTime(LocalDateTime.now());
        cancelLogMapper.insert(cancelLog1);
   }

最终一致性分布式事务解决方案_什么是可靠消息最终一致性事务

可靠消息最终一致性的基本原理是事务发起方(消息发送者)执行 本地事务成功后发出一条消息,事务参与方(消息消费者)接收到 事务发起方发送过来的消息,并成功执行本地事务。事务发起方和事务参与方最终的数据能够达到一致的状态。

两种实现方式:

1、基于本地消息表

2、基于支持分布式事务的消息中间件,如RocketMQ等

基本原理

在使用可靠消息最终一致性方案解决分布式事务的问题时,需要确保消息发送和消息消费的一致性,从而确保消息的可靠性。

可靠消息最终一致性分布式事务实现_本地消息表

本地消息表模式的核心通过本地事务保证数据业务操作和消息的一 致性,然后通过定时任务发送给消费方或者中间加一层MQ的方 式,保障数据最终一致性。

库表设计

订单微服务中出库本地消息表:

基础功能

分析

Task微服务的任务

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
目录
相关文章
|
9月前
|
消息中间件 算法 调度
分布式系统学习10:分布式事务
本文是小卷关于分布式系统架构学习系列的第13篇,重点探讨了分布式事务的相关知识。随着业务增长,单体架构拆分为微服务后,传统的本地事务无法满足需求,因此需要引入分布式事务来保证数据一致性。文中详细介绍了分布式事务的必要性、实现方案及其优缺点,包括刚性事务(如2PC、3PC)和柔性事务(如TCC、Saga、本地消息表、MQ事务、最大努力通知)。同时,还介绍了Seata框架作为开源的分布式事务解决方案,提供了多种事务模式,简化了分布式事务的实现。
369 5
|
10月前
|
存储 缓存 负载均衡
一致性哈希:解决分布式难题的神奇密钥
一致哈希是一种特殊的哈希算法,用于分布式系统中实现数据的高效、均衡分布。它通过将节点和数据映射到一个虚拟环上,确保在节点增减时只需重定位少量数据,从而提供良好的负载均衡、高扩展性和容错性。相比传统取模方法,一致性哈希能显著减少数据迁移成本,广泛应用于分布式缓存、存储、数据库及微服务架构中,有效提升系统的稳定性和性能。
577 1
|
12月前
|
消息中间件 存储 算法
分布式系列第二弹:分布式事务!
分布式系列第二弹:分布式事务!
|
12月前
|
消息中间件 缓存 算法
分布式系列第一弹:分布式一致性!
分布式系列第一弹:分布式一致性!
237 0
|
2月前
|
存储 负载均衡 NoSQL
【赵渝强老师】Redis Cluster分布式集群
Redis Cluster是Redis的分布式存储解决方案,通过哈希槽(slot)实现数据分片,支持水平扩展,具备高可用性和负载均衡能力,适用于大规模数据场景。
191 2
|
2月前
|
存储 缓存 NoSQL
【📕分布式锁通关指南 12】源码剖析redisson如何利用Redis数据结构实现Semaphore和CountDownLatch
本文解析 Redisson 如何通过 Redis 实现分布式信号量(RSemaphore)与倒数闩(RCountDownLatch),利用 Lua 脚本与原子操作保障分布式环境下的同步控制,帮助开发者更好地理解其原理与应用。
100 0
|
3月前
|
存储 缓存 NoSQL
Redis核心数据结构与分布式锁实现详解
Redis 是高性能键值数据库,支持多种数据结构,如字符串、列表、集合、哈希、有序集合等,广泛用于缓存、消息队列和实时数据处理。本文详解其核心数据结构及分布式锁实现,帮助开发者提升系统性能与并发控制能力。
|
19天前
|
NoSQL Java 调度
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
分布式锁是分布式系统中用于同步多节点访问共享资源的机制,防止并发操作带来的冲突。本文介绍了基于Spring Boot和Redis实现分布式锁的技术方案,涵盖锁的获取与释放、Redis配置、服务调度及多实例运行等内容,通过Docker Compose搭建环境,验证了锁的有效性与互斥特性。
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
|
7月前
|
数据采集 存储 数据可视化
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
610 0
分布式爬虫框架Scrapy-Redis实战指南

热门文章

最新文章