Hmily实现TCC分布式事务_项目搭建
创建父工程tx-tcc
设置逻辑工程
<packaging>pom</packaging>
创建公共模块
创建转出银行微服务
创建传入银行微服务
公共模块引入依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
Hmily实现TCC分布式事务实战_公共模块
持久层的实现
项目公共模块的持久层是转出银行微服务和转入银行微服务共用 的,在逻辑上既实现了转出金额的处理,又实现了转入金额的处理,同时还实现了TCC分布式事务每个阶段执行记录的保存和查询 操作。
@Data @NoArgsConstructor @AllArgsConstructor public class UserAccountDto implements Serializable { private static final long serialVersionUID = 3361105512695088121L; /** * 自定义事务编号 */ private String txNo; /** * 转出账户 */ private String sourceAccountNo; /** * 转入账户 */ private String targetAccountNo; /** * 金额 */ private BigDecimal amount; }
Dubbo接口的定义
在整个项目的实现过程中,转出银行微服务和转入银行微服务之间 是通过Dubbo实现远程接口调用。因为项目中定义的Dubbo接口需要被转出银行微服务和转入银行微服务同时引用,所以需要将 Dubbo接口放在项目的公共模块。
public interface UserAccountBank02Service { /** * 转账 */ void transferAmountToBank2(UserAccountDto userAccountDto); }
Hmily实现TCC分布式事务_集成Dubbo框架
转入银行微服务对外提供了转入账户的Dubbo接口,当转出银行微 服务调用转入银行微服务的Dubbo接口时,转入银行微服务会执行增加账户余额的操作。
引入Dubbo依赖
<!-- dubbo依赖 --> <dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo-spring-boot-starter</artifactId> <version>2.7.15</version> </dependency> <!--ZooKeeper客户端--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>5.2.1</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.2.1</version> </dependency>
转入银行微服务编写application.yml
server: port: 6005 spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://192.168.66.100:3306/tx-tcc-bank02? useUnicode=true&characterEncoding=utf8 username: root password01: 123456 dubbo: scan: base-packages: com.itbaizhan.service application: name: tx-tcc-bank02 registry: address: zookeeper://localhost:2181 protocol: name: dubbo port: 12345
转出银行微服务编写application.yml
server: port: 6004 spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://192.168.66.100:3306/tx-tcc-bank02? useUnicode=true&characterEncoding=utf8 username: root password01: 123456 dubbo: scan: base-packages: com.itbaizhan.service application: name: tx-tcc-bank01 registry: address: zookeeper://localhost:2181
编写转入微服务主启动类
@MapperScan("com.tong.mapper") @SpringBootApplication @Slf4j public class Bank2Main6005 { public static void main(String[] args) { SpringApplication.run(Bank2Main6005.class,args); log.info("************ Bank2Main6005 启动成功 **********"); } }
编写转出微服务主启动类
@MapperScan("com.tong.mapper") @SpringBootApplication @Slf4j public class Bank1Main6004 { public static void main(String[] args) { SpringApplication.run(Bank1Main6004.class,args); log.info("*********** Bank1Main6004*******"); } }
业务逻辑层的实现
转出银行微服务的业务逻辑层主要是实现本地账户的金额扣减操作,通过Dubbo框架实现转入银行微服务对应账户余额的增加操作。
转入微服务实现转账功能
@DubboService(version = "1.0.0") public class UserAccountServicelmpl implements UserAccountBank02Service { @Autowired private UserAccountMapper userAccountMapper; @Override public void transferAmountToBank02(UserAccountDTO userAccountDTO) { // 1. 根据账户编号查询账户信息 UserAccount userAccount = userAccountMapper.selectById(userAccountDTO); // 2. 判断账户是否存在 if (userAccount != null ){ userAccount.setAccountBalance(userAccount.getAccountBalance().add(userAccountDTO.getBigDecimal())); // 3. 更新账户 userAccountMapper.updateById(userAccount); } } }
转出微服务实现转账功能
编写转账接口
/** * 跨库转账 * @param userAccountDTO */ void transferAmountToBank02(UserAccountDTO userAccountDTO);
转出微服务转账接口实现
@Service public class UserAccountServiceImpl implements IUserAccountService { @DubboReference(version = "1.0.0") private UserAccountBank02Service userAccountBank02Service; @Autowired private UserAccountMapper userAccountMapper; @Override public void transferAmountToBank02(UserAccountDTO userAccountDTO) { // 1. 根据账户编号查询账户信息 UserAccount userAccount = userAccountMapper.selectById(userAccountDTO); // 2. 判断账户是否存在 if (userAccount != null && userAccount.getAccountBalance().compareTo(userAccountDTO.getBigDecimal()) > 0){ userAccount.setAccountBalance(userAccount.getAccountBalance().subtract(userAccountDTO.getBigDecimal())); // 3. 更新账户 userAccountMapper.updateById(userAccount); } // 4.远程调用转入微服务账户增加金额 userAccountBank02Service.transferAmountToBank02(userAccountDTO); } }
转出微服务编写控制层
@RestController @RequestMapping("/userAccount") public class UserAccountController { @Autowired private IUserAccountService iUserAccountService; /** * 转账 * @return */ @GetMapping("/transfer") public String transfer(UserAccountDTO userAccountDTO){ iUserAccountService.transferAmountToBank02(userAccountDTO); return "转账成功"; } }
Hmily实现TCC事务_集成Hmily框架
转入和转出微服务引入依赖
<dependency> <groupId>org.dromara</groupId> <artifactId>hmily-spring-boot-starter-apache-dubbo</artifactId> <version>2.1.1</version> <exclusions> <exclusion> <groupId>org.dromara</groupId> <artifactId>hmily-repository-mongodb</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency>
转入微服务编写hmily配置文件
在项目的 resource 新建文件名为: hmily.yml 配置文件
hmily: server: configMode: local appName: user-account-bank02-dubbo # 如果server.configMode eq local 的时候才会读取到这里的配置信息. config: appName: user-account-bank01-dubbo serializer: kryo contextTransmittalMode: threadLocal scheduledThreadMax: 16 scheduledRecoveryDelay: 60 scheduledCleanDelay: 60 scheduledPhyDeletedDelay: 600 scheduledInitDelay: 30 recoverDelayTime: 60 cleanDelayTime: 180 limit: 200 retryMax: 10 bufferSize: 8192 consumerThreads: 16 asyncRepository: true autoSql: true phyDeleted: true storeDays: 3 repository: mysql repository: database: driverClassName: com.mysql.cj.jdbc.Driver url : jdbc:mysql://192.168.66.100:3306/hmily? useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec t=true&failOverReadOnly=false&useSSL=false&serv erTimezone=UTC username: root password01: 123456 maxActive: 20 minIdle: 10 connectionTimeout: 30000 idleTimeout: 600000 maxLifetime: 1800000
转出微服务编写hmily配置文件
在项目的 resource 新建文件名为: hmily.yml 配置文件
hmily: server: configMode: local appName: user-account-bank01-dubbo # 如果server.configMode eq local 的时候才会读取到这里的配置信息. config: appName: user-account-bank01-dubbo serializer: kryo contextTransmittalMode: threadLocal scheduledThreadMax: 16 scheduledRecoveryDelay: 60 scheduledCleanDelay: 60 scheduledPhyDeletedDelay: 600 scheduledInitDelay: 30 recoverDelayTime: 60 cleanDelayTime: 180 limit: 200 retryMax: 10 bufferSize: 8192 consumerThreads: 16 asyncRepository: true autoSql: true phyDeleted: true storeDays: 3 repository: mysql repository: database: driverClassName: com.mysql.cj.jdbc.Driver url : jdbc:mysql://192.168.66.100:3306/hmily? useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec t=true&failOverReadOnly=false&useSSL=false&serv erTimezone=UTC username: root password: 123456 maxActive: 20 minIdle: 10 connectionTimeout: 30000 idleTimeout: 600000 maxLifetime: 1800000
实现接口上添加注解
TCC模式
Hmily实现TCC分布式事务_转入转出微服务实现Try阶段
转出微服务Try阶段
/** * 转账功能 * @param userAccountDTO */ @HmilyTCC(confirmMethod = "sayConfrim", cancelMethod = "sayCancel") @Override public void transferAmountToBank02(UserAccountDTO userAccountDTO) { String txNo = userAccountDTO.getTxNo(); log.info("********** 执行bank01 的 Try 方法 ,事务id={}",txNo); // 1、 幂等处理 TryLog tryLog = tryLogMapper.selectById(txNo); if (tryLog != null){ return ; } // 2、 悬挂处理 if (confirmLogMapper.selectById(txNo) != null || cancelLogMapper.selectById(txNo) != null){ return ; } // 3. 根据账户编号查询账户信息 UserAccount userAccount = baseMapper.selectById(userAccountDTO.getSourceAccountNo()); // 4. 判断账户是否存在 if (userAccount != null){ // 5. 账户金额更新 LambdaUpdateWrapper<UserAccount> ulw = new LambdaUpdateWrapper<>(); // 更新转账金额 ulw.set(UserAccount::getTransferAmount,userAccount.getTransferAmount().add(userAccountDTO.getBigDecimal())); // 更新余额 ulw.set(UserAccount::getAccountBalance,userAccount.getAccountBalance().subtract(userAccountDTO.getBigDecimal())); ulw.eq(UserAccount::getAccountNo,userAccountDTO.getSourceAccountNo()); baseMapper.update(null,ulw); } // 7. 准备阶段记录 TryLog tryLog1 = new TryLog(); tryLog1.setTxNo(txNo); tryLog1.setCreateTime(LocalDateTime.now()); tryLogMapper.insert(tryLog1); // 8. 远程调用 转入微服务 跨库转账的功能 userAccountBank02Service.transferAmountToBank02(userAccountDTO); }
转入微服务Try阶段
/** * 跨库转账 * @param userAccountDTO */ @HmilyTCC(confirmMethod = "sayConfrim", cancelMethod = "sayCancel") @Override public void transferAmountToBank02(UserAccountDTO userAccountDTO) { String txNo = userAccountDTO.getTxNo(); log.info("********** 执行bank02 的 Try方法 ,事务id={}",txNo); // 1、 幂等处理 TryLog tryLog = tryLogMapper.selectById(txNo); if (tryLog != null){ return ; } // 2、 悬挂处理 if (confirmLogMapper.selectById(txNo) != null || cancelLogMapper.selectById(txNo) != null){ return ; } // 3. 根据账户编号查询账户信息 UserAccount userAccount = userAccountMapper.selectById(userAccountDTO.getTargetAccountNo()); // 4. 判断账户是否存在 if (userAccount != null){ // 5. 账户金额更新 LambdaUpdateWrapper<UserAccount> ulw = new LambdaUpdateWrapper<>(); // 更新转账金额 ulw.set(UserAccount::getTransferAmount,userAccount.getTransferAmount().add(userAccountDTO.getBigDecimal())); ulw.eq(UserAccount::getAccountNo,userAccountDTO.getTargetAccountNo()); userAccountMapper.update(null,ulw); } // 7. 准备阶段记录 TryLog tryLog1 = new TryLog(); tryLog1.setTxNo(txNo); tryLog1.setCreateTime(LocalDateTime.now()); tryLogMapper.insert(tryLog1); }
Hmily实现TCC事务_转入转出微服务实现Confirm阶段
编写转出微服务Confirm阶段
/** * 确认阶段 * @param userAccountDTO */ public void sayConfrim(UserAccountDTO userAccountDTO) { String txNo = userAccountDTO.getTxNo(); log.info("********** 执行bank01 的 Confrim方法 ,事务id={}",txNo); // 1、幂等处理 ConfirmLog confirmLog = confirmLogMapper.selectById(txNo); if (confirmLog != null){ return ; } // 2、根据账户id查询账户 UserAccount userAccount = baseMapper.selectById(userAccountDTO.getSourceAccountNo()); userAccount.setTransferAmount(userAccount.getTransferAmount().subtract(userAccountDTO.getBigDecimal())); baseMapper.updateById(userAccount); // 3、 确认日志记录 ConfirmLog confirmLog1 = new ConfirmLog(); confirmLog1.setTxNo(userAccountDTO.getTxNo()); confirmLog1.setCreateTime(LocalDateTime.now()); confirmLogMapper.insert(confirmLog1); }
编写转入微服务Confirm阶段
/** * 确认阶段 * @param userAccountDTO */ public void sayConfrim(UserAccountDTO userAccountDTO) { String txNo = userAccountDTO.getTxNo(); log.info("********** 执行bank02 的Confrim方法 ,事务id={}",txNo); // 1、幂等处理 ConfirmLog confirmLog = confirmLogMapper.selectById(txNo); if (confirmLog != null) { return; } // 2、根据账户id查询账户 UserAccount userAccount = userAccountMapper.selectById(userAccountDTO.getTargetAccountNo()); userAccount.setAccountBalance(userAccount.getAccountBalance().add(userAccountDTO.getBigDecimal())); userAccount.setTransferAmount(userAccount.getTransferAmount().subtract(userAccountDTO.getBigDecimal())); userAccountMapper.updateById(userAccount); // 3、 确认日志记录 ConfirmLog confirmLog1 = new ConfirmLog(); confirmLog1.setTxNo(userAccountDTO.getTxNo()); confirmLog1.setCreateTime(LocalDateTime.now()); confirmLogMapper.insert(confirmLog1); }
Hmily实现TCC分布式事务_转入转出微服务实现Cancel阶段
转入微服务Cananl阶段
/** * 回滚 * @param userAccountDto */ @Transactional(rollbackFor = Exception.class) public void cancelMethod(UserAccountDto userAccountDto){ String txNo = userAccountDto.getTxNo(); log.info("执行bank02的cancel方法,事务id: {}, 参数为:{}",txNo,JSONObject.toJSONString(userAccountDto)); CancelLog cancelLog = iCancelLogService.findByTxNo(txNo); if(cancelLog != null){ log.info("bank02已经执行过Cancel方法,txNo:{}", txNo); return; } // 保存记录 iCancelLogService.saveCancelLog(txNo); userAccountMapper.cancelUserAccountBalanceBank02(userAccountDto.getAmount(), userAccountDto.getTargetAccountNo()); }
转出微服务Cancel阶段
/** * 取消阶段 * @param userAccountDTO */ public void sayCancel(UserAccountDTO userAccountDTO) { String txNo = userAccountDTO.getTxNo(); log.info("********** 执行bank01 的 Cancel方法 ,事务id={}",txNo); // 1. 幂等处理 CancelLog cancelLog = cancelLogMapper.selectById(txNo); if (cancelLog != null ){ return; } // 2、根据账户id查询账户 UserAccount userAccount = baseMapper.selectById(userAccountDTO.getSourceAccountNo()); userAccount.setAccountBalance(userAccount.getAccountBalance().add(userAccountDTO.getBigDecimal())); userAccount.setTransferAmount(userAccount.getTransferAmount().subtract(userAccountDTO.getBigDecimal())); baseMapper.updateById(userAccount); // 3、记录回滚日志 CancelLog cancelLog1 = new CancelLog(); cancelLog1.setTxNo(txNo); cancelLog1.setCreateTime(LocalDateTime.now()); cancelLogMapper.insert(cancelLog1); }
最终一致性分布式事务解决方案_什么是可靠消息最终一致性事务
可靠消息最终一致性的基本原理是事务发起方(消息发送者)执行 本地事务成功后发出一条消息,事务参与方(消息消费者)接收到 事务发起方发送过来的消息,并成功执行本地事务。事务发起方和事务参与方最终的数据能够达到一致的状态。
两种实现方式:
1、基于本地消息表
2、基于支持分布式事务的消息中间件,如RocketMQ等
基本原理
在使用可靠消息最终一致性方案解决分布式事务的问题时,需要确保消息发送和消息消费的一致性,从而确保消息的可靠性。
可靠消息最终一致性分布式事务实现_本地消息表
本地消息表模式的核心通过本地事务保证数据业务操作和消息的一 致性,然后通过定时任务发送给消费方或者中间加一层MQ的方 式,保障数据最终一致性。
库表设计
订单微服务中出库本地消息表:
基础功能
分析
Task微服务的任务