tcc-transaction分布式TCC型事务框架搭建与实战案例(基于Dubbo/Dubbox)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 一定分布式开发经验的朋友都知道,产品/项目/系统最初为了能够快速迭代上线,往往不太注重产品/项目/系统的高可靠性、高性能与高扩展性,采用单体应用和单实例数据库的架构方式快速迭代开发;当产品/项目/系统做到一定规模的时候,原有的系统架构则不足以支撑义务发展需要

一、背景


有一定分布式开发经验的朋友都知道,产品/项目/系统最初为了能够快速迭代上线,往往不太注重产品/项目/系统的高可靠性、高性能与高扩展性,采用单体应用和单实例数据库的架构方式快速迭代开发;当产品/项目/系统做到一定规模的时候,原有的系统架构则不足以支撑义务发展需要,往往相同的业务则需要重复写很多次,导致代码大量冗余,难以维护和扩展,这时不得不对原有产品/项目/系统进行拆分,引入分布式的系统架构;而对原有产品/项目/系统进行拆分的过程中,对于业务和数据的拆分和迁移则成为了最为棘手的问题,尤其是在原有业务不能下线,拆分后的业务同时上线的场景下这种问题更加突出;项目拆分后,业务被拆分为多个独立的子业务分散到多个子系统中,而原有的单一数据库则被拆分到多个数据库中,拆分后的数据库则同样又面临着让人头疼的分布式事务的问题。


本文就针对项目拆分后数据库的分布式事务问题,基于tcc-transaction分布式TCC型事务进行框架的搭建,同时引入相关的实战案例,来解决让人头疼的分布式事务问题。


二、tcc-transaction框架介绍


介绍:tcc-transaction是开源的TCC补偿性分布式事务框架,Git地址:https://github.com/changmingxie/tcc-transaction

TCC为Try、Confirm、Cancel的缩写:try阶段预留资源尝试提交,confirm阶段确定提交,cancel取消提交释放资源。

1.2.x项目指南地址:https://github.com/changmingxie/tcc-transaction/wiki/%E4%BD%BF%E7%94%A8%E6%8C%87%E5%8D%971.2.x

本文的例子为引入一个本人实际工作中的一个开发场景:创建资产,将资产信息同时同步到Mongo与ES的流程(ES代码不列出了,与mongo类似),整个流程保证数据一致


三、项目流程


1.下载1.2.x版本源码,并可能需要修改部分代码


因为是第三方包,所以需要自己打包到本地仓库。但包中spring版本为3.2.12.RELEASE,如果本地项目为4.x,比如本人的项目spring版本为4.3.4.RELEASE,如果不修改tcc中的spring版本,将报错无法启动,所以需要对原有框架源码进行相应的修改。

源码修改比较简单,如下


1.1 修改tcc-transaction总pom.xml文件

<!-- 第一处:修改版本为4.3.4  -->
<springframework.version>4.3.4.RELEASE</springframework.version>
<!-- 第二处:修改版本为2.2.1  -->
<dependency>
      <groupId>org.quartz-scheduler</groupId>
      <artifactId>quartz</artifactId>
      <version>2.2.1</version>
      <exclusions>
          <exclusion>
              <groupId>c3p0</groupId>
              <artifactId>c3p0</artifactId>
          </exclusion>
      </exclusions>
</dependency>
<!-- 第三处:修改版本为2.5.3  -->
<dependency>
       <groupId>com.alibaba</groupId>
       <artifactId>dubbo</artifactId>
       <version>2.5.3</version>
</dependency>


1.2 修改 Java类

tcc-transaction-spring/src/main/java/org/mengyun/tcctransaction/spring/recover/RecoverScheduledJob.java

该文件中 CronTriggerBean类在4.x中已经不存在,也是修改源码主要修改的地方。

修改其中的init方法,修改后如下:

public void init() {
    try {
        MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();
        jobDetail.setTargetObject(transactionRecovery);
        jobDetail.setTargetMethod("startRecover");
        jobDetail.setName("transactionRecoveryJob");
        jobDetail.setConcurrent(false);
        jobDetail.afterPropertiesSet();
        CronTriggerFactoryBean cronTrigger = new CronTriggerFactoryBean();
        cronTrigger.setBeanName("transactionRecoveryCronTrigger");
        cronTrigger.setJobDetail(jobDetail.getObject());
        cronTrigger.setCronExpression(transactionConfigurator.getRecoverConfig().getCronExpression());
        cronTrigger.afterPropertiesSet();
        scheduler.scheduleJob(jobDetail.getObject(), cronTrigger.getObject());
        scheduler.start();
    } catch (Exception e) {
        throw new SystemException(e);
    }
}


各位也可参考如下的修改:https://github.com/changmingxie/tcc-transaction/pull/84/files 


1.3 打包并发布


这里我们通过Maven进行打包发布,命令为:

mvn -Dmaven.test.skip=true install


2.项目依赖


参考1.2.x使用指南,引入两个依赖(本人项目dubbo/dubbox框架,我使用并打包时版本为1.2.3.1)。调用方和提供方都需要引入依赖。


3.加载tcc-transaction.xml配置


原文中是配置在web.xml中,我个人试了一下放在dubbo web项目的web.xml中,但配置并没有被加载。该文件的意义只是希望项目启动时被加载,于是直接在dubbo中的一个spring的配置文件中引入,如下:

<!-- TCC Transaction -->
<import resource="classpath:tcc-transaction.xml" />

该文件里面提供各种aop逻辑,项目启动时扫描指定注解,并做增强。


4.设置TransactionRepository‘


需要为tcc配置数据源,可以是MySQL或其他nosql,本文使用mysql,其他可参见原指南文档。

mysql配置如下:

<!--tcc-->
<bean id="tccDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="${jdbc.driverClassName}" />
    <property name="url" value="${jdbc.tcc.url}" />
    <property name="username" value="${jdbc.username}" />
    <property name="password" value="${jdbc.password}" />
    <property name="initialSize" value="${dbcp.initialSize}" />
    <property name="maxActive" value="${dbcp.maxActive}" />
    <property name="maxIdle" value="${dbcp.maxIdle}" />
    <property name="maxWait" value="${dbcp.maxWait}" />
    <property name="poolPreparedStatements" value="${dbcp.poolPreparedStatements}" />
    <property name="defaultAutoCommit" value="${dbcp.defaultAutoCommit}" />
    <property name="timeBetweenEvictionRunsMillis" value="${dbcp.timeBetweenEvictionRunsMillis}" />
    <property name="minEvictableIdleTimeMillis" value="${dbcp.minEvictableIdleTimeMillis}" />
</bean>
<bean id="transactionRepository"
      class="org.mengyun.tcctransaction.spring.repository.SpringJdbcTransactionRepository">
    <property name="dataSource" ref="tccDataSource"/>
    <property name="domain" value="SAAS"/>
    <property name="tbSuffix" value="_ASSET"/>
</bean>
<bean class="org.mengyun.tcctransaction.spring.recover.DefaultRecoverConfig">
    <property name="maxRetryCount" value="30"/>
    <property name="recoverDuration" value="120"/>
    <property name="cronExpression" value="0 */1 * * * ?"/>
    <property name="delayCancelExceptions">
        <util:set>
            <value>com.alibaba.dubbo.remoting.TimeoutException</value>
        </util:set>
    </property>
</bean>
       <util:set>            <value>com.alibaba.dubbo.remoting.TimeoutException</value>        </util:set>    </property></bean>


需要注意的点:

1.数据源必须配置新的,不能使用之前项目存在的dataSource的bean,也不能在同一库中,不然会导致tcc表数据与本地事务一起回滚,从而无法保存异常事务日志;

2.注意domain、tbSuffix的配置,这两项文档中并没有配置,但源码demo中配置了,用于数据库的表名称等,推荐配置;

3.最后的DefaultRecoverConfig项是可选的,用于恢复与重试,具体作用参考使用指南;

4.defaultAutoCommit必须为true(默认为true)


5.mysql建表脚本


根据以上的tbSufifix配置,脚本如下:


CREATE TABLE `tcc_transaction_asset` (
  `TRANSACTION_ID` int(11) NOT NULL AUTO_INCREMENT,
  `DOMAIN` varchar(100) DEFAULT NULL,
  `GLOBAL_TX_ID` varbinary(32) NOT NULL,
  `BRANCH_QUALIFIER` varbinary(32) NOT NULL,
  `CONTENT` varbinary(8000) DEFAULT NULL,
  `STATUS` int(11) DEFAULT NULL,
  `TRANSACTION_TYPE` int(11) DEFAULT NULL,
  `RETRIED_COUNT` int(11) DEFAULT NULL,
  `CREATE_TIME` datetime DEFAULT NULL,
  `LAST_UPDATE_TIME` datetime DEFAULT NULL,
  `VERSION` int(11) DEFAULT NULL,
  PRIMARY KEY (`TRANSACTION_ID`),
  UNIQUE KEY `UX_TX_BQ` (`GLOBAL_TX_ID`,`BRANCH_QUALIFIER`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8


如果表名称不对,启动过程会报错,这时,对数据表做相应调整即可。


6.发布服务(重点)


6.1 dubbo接口
 /**
  * @author binghe
  * 资产相关的业务发布Dubbo服务
  */
public interface AssetCardService {
    /**
     * 测试预保存资产(状态为待确认)
     */
    @Compensable
    int testSaveAssetCard(AssetCardModel model);
    /**
     * 确认保存资产到mysql(状态为正常)
     */
    int confirmMysqlSaveAssetCard(AssetCardModel model);
    /**
     * 取消保存资产到msyql(更新状态为删除)
     */
    int cancelMysqlSaveAssetCard(AssetCardModel model);
    /**
     * 预保存资产到mongo(状态为待确认)
     */
    @Compensable
    void processMongo(AssetCardModel model);
    /**
     * 确认保存资产到mongo(状态为正常)
     */
    void confirmMongoSaveAssetCard(AssetCardModel model);
    /**
     * 取消保存资产到mongo(更新状态为删除)
     */
    void cancelMongoSaveAssetCard(AssetCardModel model);
}


需要注意的点:

1.对外提供服务的接口必须有@Compensable注解;

2.对应的confirm与cancel方法必须声明为接口,不能声明为private,即使是public也不行,必须有接口。


6.2 dubbo接口实现类
/**
* @author binghe
* 资产相关的业务发布Dubbo服务的实现
*/
@Service
@Component
public class AssetCardServiceImpl implements AssetCardService{
    @Override
  @Compensable(confirmMethod = "confirmMysqlSaveAssetCard", cancelMethod = "cancelMysqlSaveAssetCard", transactionContextEditor = DubboTransactionContextEditor.class)
  @Transactional(propagation = Propagation.REQUIRED, rollbackFor = { Exception.class })
  public int testSaveAssetCard(AssetCardModel model){
      // 保存mysql,data状态为-1
      model.setDataStatus(-1);
      assetCardDao.insert(model);
      // mongo处理
      assetCardService.processMongo(model);
      return model.getId();
  }
  @Override
  public int confirmMysqlSaveAssetCard(AssetCardModel model){
      System.out.println("============================================================================");
      System.out.println("=================mysql:confirm");
      System.out.println("============================================================================");
      // 更新mysql data_status为0
      model.setDataStatus(0);
      assetCardDao.updateByPrimaryKey(model);
      return model.getId();
  }
  @Override
  public int cancelMysqlSaveAssetCard(AssetCardModel model){
      System.out.println("============================================================================");
      System.out.println("=================mysql:cancel");
      System.out.println("============================================================================");
      // 更新mysql data_status为-1
      model.setDataStatus(-1);
      assetCardDao.updateByPrimaryKey(model);
      return model.getId();
  }
  @Compensable(confirmMethod = "confirmMongoSaveAssetCard", cancelMethod = "cancelMongoSaveAssetCard", transactionContextEditor = DubboTransactionContextEditor.class)
  @Override
  public void processMongo(AssetCardModel model) {
      // 保存mongo,data_statu为-1
      model.setDataStatus(-1);
      assetCardDaoWrapper.saveMongo(model);
  }
  @Override
  public void confirmMongoSaveAssetCard(AssetCardModel model){
      System.out.println("============================================================================");
      System.out.println("=================mongo:confirm");
      System.out.println("============================================================================");
      // 更新mongo data_status为0
      model.setDataStatus(0);
      assetCardDaoWrapper.updateMongo(model);
  }
  @Override
  public void cancelMongoSaveAssetCard(AssetCardModel model){
      System.out.println("============================================================================");
      System.out.println("=================mongo:cancel");
      System.out.println("============================================================================");
      // 更新mongo data_status为-1
      model.setDataStatus(-1);
      assetCardDao.updateByPrimaryKey(model);
      assetCardDaoWrapper.updateMongo(model);
  }
}


注意点:

1.对外提供服务的接口必须有@Compensable注解,同时必须有confirmMethod、cancelMethod参数的配置,同时dubbo接口额外增加 "transactionContextEditor = DubboTransactionContextEditor.class"这个配置;

2.提供服务接口与对应另外的两个CC方法参数必须完全一致;

3.该tcc框架可嵌套调用,如上在testSaveAssetCard方法,即try阶段中调用了另一个tcc方法"assetCardService.processMongo()",理论上嵌套只应该在try阶段进行;

4.confirm、cancel需要实现幂等性,可能会被重试;5.由于网络等因素,可能导致cancel方法先执行,cancel方法一定要做好相应的判断与处理


6.3 调用方

@Override
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = { Exception.class })
public long testSaveAssetCard(AssetCardModel assetCardModel) throws AssetException {
    assetCardModel.setId(IdGenerator.getId());  
    return assetCardService.testSaveAssetCard(assetCardModel);
}


注意点:

1.因为需要回滚更新等操作,所以此业务中id不能用自增,而是需要项目生成;

2.特别注意,调用方必须在事务中,也就是说必须有事务注解,或者能被事务配置切到,没有事务tcc框架调用时会抛异常。

至此,配置已经全部完成。


7.事务查看


源码中提供tcc-transaction-server web项目,该项目提供界面查看事务日志,打包后部署即可,我们这里就不在作详细的描述。


四、TCC执行流程


业务流程使用记录:

前提:用户下单,建立订单,创建支付记录,支付记录状态为待支付

try:

用户金额冻结

调用积分处理TCC:

try:预增加积分

confirm:更新增加积分状态

cancel:取消增加的积分

confirm:

订单支付状态更新为已支付

订单支付记录支付状态更新为已支付

用户金额扣款(以上三个操作在同一本地事务)

cancel:

判断订单支付状态与订单记录支付状态为未支付

用户冻结金额释放

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
2月前
|
存储 监控 数据可视化
常见的分布式定时任务调度框架
分布式定时任务调度框架用于在分布式系统中管理和调度定时任务,确保任务按预定时间和频率执行。其核心概念包括Job(任务)、Trigger(触发器)、Executor(执行器)和Scheduler(调度器)。这类框架应具备任务管理、任务监控、良好的可扩展性和高可用性等功能。常用的Java生态中的分布式任务调度框架有Quartz Scheduler、ElasticJob和XXL-JOB。
782 66
|
1月前
|
数据采集 人工智能 分布式计算
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
阿里云推出的MaxFrame是链接大数据与AI的分布式Python计算框架,提供类似Pandas的操作接口和分布式处理能力。本文从部署、功能验证到实际场景全面评测MaxFrame,涵盖分布式Pandas操作、大语言模型数据预处理及企业级应用。结果显示,MaxFrame在处理大规模数据时性能显著提升,代码兼容性强,适合从数据清洗到训练数据生成的全链路场景...
102 5
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
|
1月前
|
人工智能 分布式计算 大数据
MaxFrame 产品评测:大数据与AI融合的Python分布式计算框架
MaxFrame是阿里云MaxCompute推出的自研Python分布式计算框架,支持大规模数据处理与AI应用。它提供类似Pandas的API,简化开发流程,并兼容多种机器学习库,加速模型训练前的数据准备。MaxFrame融合大数据和AI,提升效率、促进协作、增强创新能力。尽管初次配置稍显复杂,但其强大的功能集、性能优化及开放性使其成为现代企业与研究机构的理想选择。未来有望进一步简化使用门槛并加强社区建设。
93 7
|
2月前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
127 2
|
8天前
|
NoSQL Java 中间件
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
本文介绍了从单机锁到分布式锁的演变,重点探讨了使用Redis实现分布式锁的方法。分布式锁用于控制分布式系统中多个实例对共享资源的同步访问,需满足互斥性、可重入性、锁超时防死锁和锁释放正确防误删等特性。文章通过具体示例展示了如何利用Redis的`setnx`命令实现加锁,并分析了简化版分布式锁存在的问题,如锁超时和误删。为了解决这些问题,文中提出了设置锁过期时间和在解锁前验证持有锁的线程身份的优化方案。最后指出,尽管当前设计已解决部分问题,但仍存在进一步优化的空间,将在后续章节继续探讨。
432 131
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
|
11天前
|
NoSQL Java Redis
Springboot使用Redis实现分布式锁
通过这些步骤和示例,您可以系统地了解如何在Spring Boot中使用Redis实现分布式锁,并在实际项目中应用。希望这些内容对您的学习和工作有所帮助。
136 83
|
7天前
|
缓存 NoSQL 搜索推荐
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
本文介绍了如何通过Lua脚本在Redis中实现分布式锁的原子性操作,避免并发问题。首先讲解了Lua脚本的基本概念及其在Redis中的使用方法,包括通过`eval`指令执行Lua脚本和通过`script load`指令缓存脚本。接着详细展示了如何用Lua脚本实现加锁、解锁及可重入锁的功能,确保同一线程可以多次获取锁而不发生死锁。最后,通过代码示例演示了如何在实际业务中调用这些Lua脚本,确保锁操作的原子性和安全性。
34 6
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
|
4月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
20天前
|
缓存 NoSQL 中间件
Redis,分布式缓存演化之路
本文介绍了基于Redis的分布式缓存演化,探讨了分布式锁和缓存一致性问题及其解决方案。首先分析了本地缓存和分布式缓存的区别与优劣,接着深入讲解了分布式远程缓存带来的并发、缓存失效(穿透、雪崩、击穿)等问题及应对策略。文章还详细描述了如何使用Redis实现分布式锁,确保高并发场景下的数据一致性和系统稳定性。最后,通过双写模式和失效模式讨论了缓存一致性问题,并提出了多种解决方案,如引入Canal中间件等。希望这些内容能为读者在设计分布式缓存系统时提供有价值的参考。感谢您的阅读!
110 6
Redis,分布式缓存演化之路
|
2月前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
234 5