阿里开源分布式事务框架seata实践(原fescar) springboot +durid+mybitas+自有rpc框架

简介: 由于系统演进,大佬觉得 需要做微服务,脑子一拍开始对原来的系统进行微服务改造, 在改造过程中,分布式事务不可避免,tcc mq等等概念研究一遍后,结合时间成本,发现阿里gts 比较适合,无奈需要接入外网遂放弃,后来偶然发现seata 开源版gts 尝试接入项目

本文章仅作为seata接入文档,seata原理和源码请自行转至github https://github.com/seata/seata
官方文档地址https://github.com/seata/seata/wiki/Home_Chinese

1 由于系统演进,大佬觉得 需要做微服务,脑子一拍开始对原来的系统进行微服务改造,
在改造过程中,分布式事务不可避免,tcc mq等等概念研究一遍后,结合时间成本,发现阿里gts 比较适合,无奈需要接入外网遂放弃,后来偶然发现seata 开源版gts 尝试接入项目
先放一张流程图
image

接入流程

1 首先去官网git 下载一份源码,我下载的是0.5.2版本。
2 在本地解压加载到idea下载jar包后直接启动server项目中的启动类Server.java ,在调试过程中发现netty存在有时内存不够问题,遂增加启动参数-XX:MaxDirectMemorySize=1024m
3 server 基于netty开发目前只支持单节点启动,内存大小没有进行压力测试,seata配置文件为registry.conf 附上关键配置

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "file"
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul
  type = "file"
  file {
    name = "file.conf"
  }
}

type 指定配置方式 目前支持file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
默认配置为file 在接入过程中也使用file 配置类型

file.conf 和registry.conf 目前都在resource目录下
附上file.conf 关键配置

service {
  #vgroup->rgroup
  vgroup_mapping.my_test_tx_group = "default"
  #only support single node
  default.grouplist = "127.0.0.1:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
}

接入过程 vgroup_mapping.my_test_tx_group = "default" 修改配置为自定义配置vgroup_mapping.my_group= "default"
具体原则不清楚

4 架构图中的tc搭建完成,下一步搭建RM 也就是微服务的原子系统 引入seata 的jar包

<!--框架问题,指定durid版本-->
<properties>
        <druid.version>1.1.10</druid.version>
        <seata.version>0.5.0</seata.version>
 </properties>
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring</artifactId>
    <version>${seata.version}</version>
</dependency>

5 ,按照官方文档,需要将数据源替换为seata数据源,本项目是springboot+durid+mybitas 直接上代码 关键代码 DataSourceProxy proxy = new DataSourceProxy(datasource);

@Configuration
public class DruidConfig {
    @Value("${spring.datasource.url}")
    private String dbUrl;

    @Value("${spring.datasource.username}")
    private String username;

    @Value("${spring.datasource.password}")
    private String password;

    @Value("${spring.datasource.driver-class-name}")
    private String driverClassName;

    @Value("${spring.datasource.initialSize}")
    private int initialSize;

    @Value("${spring.datasource.minIdle}")
    private int minIdle;

    @Value("${spring.datasource.maxActive}")
    private int maxActive;

    @Value("${spring.datasource.maxWait}")
    private int maxWait;

    @Value("${spring.datasource.timeBetweenEvictionRunsMillis}")
    private int timeBetweenEvictionRunsMillis;

    @Value("${spring.datasource.minEvictableIdleTimeMillis}")
    private int minEvictableIdleTimeMillis;

    @Value("${spring.datasource.validationQuery}")
    private String validationQuery;

    @Value("${spring.datasource.testWhileIdle}")
    private boolean testWhileIdle;

    @Value("${spring.datasource.testOnBorrow}")
    private boolean testOnBorrow;

    @Value("${spring.datasource.testOnReturn}")
    private boolean testOnReturn;

    @Value("${spring.datasource.poolPreparedStatements}")
    private boolean poolPreparedStatements;

    @Value("${spring.datasource.filters}")
    private String filters;

    @Value("${mybatis.mapper-locations}")
    private String mapperLocation;
    @Bean
    @Primary
    public DataSource druidDataSource() {
        DruidDataSource datasource = new DruidDataSource();
        datasource.setUrl(this.dbUrl);
        datasource.setUsername(username);
        datasource.setPassword(password);
        datasource.setDriverClassName(driverClassName);
        datasource.setInitialSize(initialSize);
        datasource.setMinIdle(minIdle);
        datasource.setMaxActive(maxActive);
        datasource.setMaxWait(maxWait);
        datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
        datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
        datasource.setValidationQuery(validationQuery);
        datasource.setTestWhileIdle(testWhileIdle);
        datasource.setTestOnBorrow(testOnBorrow);
        datasource.setTestOnReturn(testOnReturn);
        datasource.setPoolPreparedStatements(poolPreparedStatements);
        DataSourceProxy proxy = new DataSourceProxy(datasource);
        return proxy;
    }
    @Bean(name="sqlSessionFactory")
    public SqlSessionFactoryBean sqlSessionFactory(DataSource dataSource) throws Exception {

        SqlSessionFactoryBean sqlSessionFactory = new SqlSessionFactoryBean();
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        Resource[] mapperXmlResource = resolver.getResources(mapperLocation);
        sqlSessionFactory.setDataSource(dataSource);
        sqlSessionFactory.setMapperLocations(mapperXmlResource);
        return sqlSessionFactory;
    }
}

6 新增seata 扫描器配置 直接上代码

@Configuration
public class SeataConfiguration {
    @Value("${spring.application.name}")
    private String applicationId;

    /**
     * 注册一个StatViewServlet
     *
     * @return global transaction scanner
     */
    @Bean
    public GlobalTransactionScanner globalTransactionScanner() {
        GlobalTransactionScanner globalTransactionScanner = new GlobalTransactionScanner(applicationId,
                "my_group");
        return globalTransactionScanner;
    }
}

7
新增拦截器过滤器或者切面等,在业务执行前,拦截每个请求,并获取XID并绑定,本人是在service前增加切面,并处理数据
seata目前支持dubbo和springcloud 默认XID放在headers中,由于我们的项目使用的自有的rpc框架,因此需要自己手动获取XID,为了方便
我将XID写在了body中,自己接入的时候,需要按照需要自行设置
上代码


 String xid = RootContext.getXID();
        String restXid = StringUtil.getStringValue(esbInput.getParams().get("Seata-Xid"));
        boolean bind = false;
        if (StringUtils.isBlank(xid) && StringUtils.isNotBlank(restXid)) {
            RootContext.bind(restXid);
            bind = true;
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("bind[" + restXid + "] to RootContext");
            }
        }

try{
                //执行方法体
                object = joinPoint.proceed(args);
            }catch (GeneralException e){//对外接口统一异常捕获解析
               
          }finally {
                if (bind) {
                    String unbindXid = RootContext.unbind();
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
                    }
                    if (!restXid.equalsIgnoreCase(unbindXid)) {
                        LOGGER.warn("xid in change during http rest from " + restXid + " to " + unbindXid);
                        if (unbindXid != null) {
                            RootContext.bind(unbindXid);
                            LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
                        }
                    }
                }
            }

8 增加配置文件 file.conf 和registry.conf 并按照自己的实际情况进行配置,同时按照官方文档,在原子服务数据库新增日志回退表
undo_log 建表语句为


DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=94 DEFAULT CHARSET=utf8;

9 创建TM服务
引入jar包`js

    <seata.version>0.5.0</seata.version>


<groupId>io.seata</groupId>
<artifactId>seata-spring</artifactId>
<version>${seata.version}</version>

`

10
新增scanner 配置 和rm 一致

@Configuration
public class SeataConfiguration {
    @Value("${spring.application.name}")
    private String applicationId;


    /**
     * 注册一个StatViewServlet
     *
     * @return global transaction scanner
     */
    @Bean
    public GlobalTransactionScanner globalTransactionScanner() {
        GlobalTransactionScanner globalTransactionScanner = new GlobalTransactionScanner(applicationId,
                "nguc_tx_group");
        return globalTransactionScanner;
    }
}

11 开启事务和事务的提交回滚

private void transactionalRollBack(GlobalTransaction globalTransaction,String xid){
        LOGGER.error("分布式事务中断,事务开始回滚");
        try {
            globalTransaction.rollback();
        } catch (TransactionException txe) {
            LOGGER.error("分布式事务回滚失败,全局事务XID : " + xid);
        }
    }
 public XX doTransaction(){
        GlobalTransaction globalTransaction = GlobalTransactionContext.getCurrentOrCreate();

        //begin GlobalTransactional
        try {
            globalTransaction.begin(20000, "test");
        } catch (TransactionException e) {
            LOGGER.error("全局事务开启失败")
            return outObject;
        }
        String xid = RootContext.getXID();
        //组合服务标示
        try {
            call(input)
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);

            transactionalRollBack(globalTransaction, xid);

        }
        try {
            globalTransaction.rollback();
            LOGGER.error("全局事务提交成功,全局事务XID : " + xid);
        } catch (TransactionException txe) {
            LOGGER.error("全局事务提交失败,全局事务XID : " + xid);
        }
        return xx;
    }

12 通过TM调用rm服务,并测试回滚,可以在commit前添加断点查看undo_log中的数据

相关文章
|
2月前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。
|
2月前
|
消息中间件 Java Kafka
在Java中实现分布式事务的常用框架和方法
总之,选择合适的分布式事务框架和方法需要综合考虑业务需求、性能、复杂度等因素。不同的框架和方法都有其特点和适用场景,需要根据具体情况进行评估和选择。同时,随着技术的不断发展,分布式事务的解决方案也在不断更新和完善,以更好地满足业务的需求。你还可以进一步深入研究和了解这些框架和方法,以便在实际应用中更好地实现分布式事务管理。
|
1月前
|
存储 监控 数据可视化
常见的分布式定时任务调度框架
分布式定时任务调度框架用于在分布式系统中管理和调度定时任务,确保任务按预定时间和频率执行。其核心概念包括Job(任务)、Trigger(触发器)、Executor(执行器)和Scheduler(调度器)。这类框架应具备任务管理、任务监控、良好的可扩展性和高可用性等功能。常用的Java生态中的分布式任务调度框架有Quartz Scheduler、ElasticJob和XXL-JOB。
408 66
|
25天前
|
数据采集 人工智能 分布式计算
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
阿里云推出的MaxFrame是链接大数据与AI的分布式Python计算框架,提供类似Pandas的操作接口和分布式处理能力。本文从部署、功能验证到实际场景全面评测MaxFrame,涵盖分布式Pandas操作、大语言模型数据预处理及企业级应用。结果显示,MaxFrame在处理大规模数据时性能显著提升,代码兼容性强,适合从数据清洗到训练数据生成的全链路场景...
72 5
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
|
13天前
|
人工智能 分布式计算 大数据
MaxFrame 产品评测:大数据与AI融合的Python分布式计算框架
MaxFrame是阿里云MaxCompute推出的自研Python分布式计算框架,支持大规模数据处理与AI应用。它提供类似Pandas的API,简化开发流程,并兼容多种机器学习库,加速模型训练前的数据准备。MaxFrame融合大数据和AI,提升效率、促进协作、增强创新能力。尽管初次配置稍显复杂,但其强大的功能集、性能优化及开放性使其成为现代企业与研究机构的理想选择。未来有望进一步简化使用门槛并加强社区建设。
50 7
|
1月前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
88 2
|
2月前
|
存储 Dubbo Java
分布式 RPC 底层原理详解,看这篇就够了!
本文详解分布式RPC的底层原理与系统设计,大厂面试高频,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
分布式 RPC 底层原理详解,看这篇就够了!
|
1月前
|
缓存 NoSQL Java
Spring Boot中的分布式缓存方案
Spring Boot提供了简便的方式来集成和使用分布式缓存。通过Redis和Memcached等缓存方案,可以显著提升应用的性能和扩展性。合理配置和优化缓存策略,可以有效避免常见的缓存问题,保证系统的稳定性和高效运行。
64 3
|
2月前
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
140 6
|
2月前
|
数据库
如何在Seata框架中配置分布式事务的隔离级别?
总的来说,配置分布式事务的隔离级别是实现分布式事务管理的重要环节之一,需要认真对待和仔细调整,以满足业务的需求和性能要求。你还可以进一步深入研究和实践 Seata 框架的配置和使用,以更好地应对各种分布式事务场景的挑战。
70 6

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等