猿创征文|手把手教你微服务分布式事务与Seata框架源码分析(二)

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 猿创征文|手把手教你微服务分布式事务与Seata框架源码分析

三,Seata 安装


1,下载seata


官网下载地址: https://seata.io/zh-cn/blog/download.html


2,安装seata


解压seata-server-1.5.2安装包。解压出来我们进入bin目录会看到相应的启动脚本(由于这里是在Windows演示)

1.png

在 Linux/Mac 下

sh ./bin/seata-server.sh

在 Windows 下

bin\seata-server.bat

高可用部署:

Seata 的高可用依赖于注册中心、配置中心和数据库来实现。使用nacos和redis为例

1.png

这里我们看到是基于文件的方式,后面我们会有详细的课程讲解基于nacos注册中心的安装配置等


3,运行seata


  • windows:bin目录下双击seata-server.bat启动
  • linux:命令行启动 seata-server.sh -h 127.0.0.1 -p 8091


启动参数:

  • -h: 注册到注册中心的ip
  • -p: Server rpc 监听端口
  • -m: 全局事务会话信息存储模式,nacos, consul, apollo, zk, etcd3,优先读取启动参数
  • -n: Server node,多个Server时,需区分各自节点,用于生成不同区间的transactionId,以免冲突
  • -e: 环境配置

1.png


四,项目环境搭建


1,增加 Maven 依赖


首先,您需要将 nacos-client 的 Maven 依赖添加到您的项目 pom.xml 文件中

<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>最新版</version>
</dependency>
<dependency>
    <groupId>com.alibaba.nacos</groupId>
    <artifactId>nacos-client</artifactId>
    <version>1.2.0及以上版本</version>
</dependency>


2,Client端配置中心


在 application.yml 中加入对应的配置中心,其余配置参考

seata:
  config:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      group : "SEATA_GROUP"
      namespace: ""
      username: "XXXXXX"


3,Server端配置中心


在 registry.conf 中加入对应配置中心,其余配置参考

config {
  type = "nacos"
  nacos {
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = ""
    username = "nacos"
  }
}


4,上传配置至Nacos配置中心

seata:
  config:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      group : "SEATA_GROUP"
      namespace: ""
      dataId: "seataServer.properties"
      username: "nacos"


五,源码分析


今天咱们开始咱们的主题了,在了解以上问题之后我们是不是比较好奇他的运行流程。了解执行流程我们就要从官网,或者源码入手来了解


1,启动类


// 【排除SpringBoot启动时自动加载GlobalTransactionAutoConfiguration.class】
@SpringBootApplication(exclude = {GlobalTransactionAutoConfiguration.class})
@EnableFeignClients   // 远程调用
@EnableDiscoveryClient   // 开启nacos服务
public class GatewayApplication{
   public static void main(String[] args) {
     ErosApplication.run(ProjectNameConstatnt.GATEWAY_NAME,"consume",GatewayApplication.class,args);
  }
}


2,配置类

@GlobalTransactional(name="XXXXX",rollbackFor = RuntimeException.class)


3,@GlobalTransactional


方法上加了@GlobalTransactional,Seata通过aop检测到之后,就会使用TM和TC通信,注册全局事务。

在@GlobalTransactional涵括的代码中,不管是本服务中的sql操作,还是feign调用别的服务的sql操作,只要sql操作满足如下:insert操作,delete操作,update操作,select for update操作。 就会被seata增强,使用RM与TC通信,注册分支事务。

全局事务:包括开启事务、提交、回滚、获取当前状态等方法。

public interface GlobalTransaction {
    /**
     * 开启一个全局事务(使用默认的事务名和超时时间)
     */
    void begin() throws TransactionException;
    /**
     * 开启一个全局事务,并指定超时时间(使用默认的事务名)
     */
    void begin(int timeout) throws TransactionException;
    /**
     * 开启一个全局事务,并指定事务名和超时时间
     */
    void begin(int timeout, String name) throws TransactionException;
    /**
     * 全局提交
     */
    void commit() throws TransactionException;
    /**
     * 全局回滚
     */
    void rollback() throws TransactionException;
    /**
     * 获取事务的当前状态
     */
    GlobalStatus getStatus() throws TransactionException;
    /**
     * 获取事务的 XID
     */
    String getXid();
}

GlobalTransactionScanner继承自AbstractAutoProxyCreator,在这里拦截到加了@GlobalTransactional的方法。

GlobalTransactionScanner

1.png

  • AbstractAutoProxyCreator:wrapIfNecessary(aop的核心),getAdvicesAndAdvisorsForBean(拦截器)
  • InitializingBean:afterPropertiesSet(初始化TM,RM)


3.1 初始化TM,RM

@Override
    public void afterPropertiesSet() {
        //是否禁止了全局事务
        if (disableGlobalTransaction) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Global transaction is disabled.");
            }
            return;
        }
        //初始化netty 客户端(TM  RM)
        initClient();
    }

初始化TM,RM

private void initClient() {  
        //init TM
        TMClient.init(applicationId, txServiceGroup);
        //init RM
        RMClient.init(applicationId, txServiceGroup);
    }

1.png


3.2 TransactionalTemplate


GlobalTransaction 和 GlobalTransactionContext API 把一个业务服务的调用包装成带有分布式事务支持的服务。

public class TransactionalTemplate {
    public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
        // 1. 获取当前全局事务实例或创建新的实例
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
        // 2. 开启全局事务
        try {
            tx.begin(business.timeout(), business.name());
        } catch (TransactionException txe) {
            // 2.1 开启失败
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);
        }
        Object rs = null;
        try {
            // 3. 调用业务服务
            rs = business.execute();
        } catch (Throwable ex) {
            // 业务调用本身的异常
            try {
                // 全局回滚
                tx.rollback();
                // 3.1 全局回滚成功:抛出原始业务异常
                throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
            } catch (TransactionException txe) {
                // 3.2 全局回滚失败:
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.RollbackFailure, ex);
            }
        }
        // 4. 全局提交
        try {
            tx.commit();
        } catch (TransactionException txe) {
            // 4.1 全局提交失败:
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.CommitFailure);
        }
        return rs;
    }
}

DataSourceProxy#getConnection

@Override
    public ConnectionProxy getConnection() throws SQLException {
        Connection targetConnection = targetDataSource.getConnection();
        return new ConnectionProxy(this, targetConnection);
    }

ConnectionProxy#doCommit

private void doCommit() throws SQLException {
    //处理@GlobalTransaction的分支事务
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } 
        //处理@GlobalLock,即检查一下是否可以获取全局锁
    else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else {
            targetConnection.commit();
        }
    }

ConnectionProxy#processGlobalTransactionCommit

1.png

模板的异常方法

class ExecutionException extends Exception {
        // 发生异常的事务实例
        private GlobalTransaction transaction;
        // 异常编码:
        // BeginFailure(开启事务失败)
        // CommitFailure(全局提交失败)
        // RollbackFailure(全局回滚失败)
        // RollbackDone(全局回滚成功)
        private Code code;
        // 触发回滚的业务原始异常
        private Throwable originalException;

 

  以上就是我们今天的教程,如果本文对你有所帮助,欢迎关注点赞,分享给您身边的朋友。您的鼓励就是对我的最大动力。

相关文章
|
21天前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。
|
3月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
3月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
8天前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
82 7
|
14天前
|
存储 运维 数据可视化
如何为微服务实现分布式日志记录
如何为微服务实现分布式日志记录
28 1
|
20天前
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
34 6
|
20天前
|
数据库
如何在Seata框架中配置分布式事务的隔离级别?
总的来说,配置分布式事务的隔离级别是实现分布式事务管理的重要环节之一,需要认真对待和仔细调整,以满足业务的需求和性能要求。你还可以进一步深入研究和实践 Seata 框架的配置和使用,以更好地应对各种分布式事务场景的挑战。
25 6
|
21天前
|
机器学习/深度学习 分布式计算 算法
【大数据分析&机器学习】分布式机器学习
本文主要介绍分布式机器学习基础知识,并介绍主流的分布式机器学习框架,结合实例介绍一些机器学习算法。
126 5
|
18天前
|
消息中间件 运维 数据库
Seata框架和其他分布式事务框架有什么区别
Seata框架和其他分布式事务框架有什么区别
21 1
|
2月前
|
程序员
后端|一个分布式锁「失效」的案例分析
小猿最近很苦恼:明明加了分布式锁,为什么并发还是会出问题呢?
34 2