Fescar example解析 - GlobalTransaction

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 开篇  这篇文章是接着Fescar example解析 - TM流程的下一步分析,主要是对TM的处理逻辑的进一步分析,理清楚TM(Transaction Manager )的处理步骤以及代码调用链。  这篇文章的结论是TM执行事务操作包括begin/commit/rollback都是通过DefaultTransactionManager类来实现,实现形式是TM和TC进行网络通信,在整个TM->TC的过程中TM担当了Client端的角色,TC担当了Server端的角色。

开篇

 这篇文章是接着Fescar example解析 - TM流程的下一步分析,主要是对TM的处理逻辑的进一步分析,理清楚TM(Transaction Manager )的处理步骤以及代码调用链。

这篇文章的结论是TM执行事务操作包括begin/commit/rollback都是通过DefaultTransactionManager类来实现,实现形式是TM和TC进行网络通信,在整个TM->TC的过程中TM担当了Client端的角色,TC担当了Server端的角色。


背景介绍

事务资料摘自Fescar概览
与XA 的模型类似,我们定义 3 个组件来协议分布式事务的处理过程。

FESCAR Model

  • Transaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。
  • Transaction Manager (TM): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。
  • Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。

一个典型的分布式事务过程:

  1. TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID。
  2. XID 在微服务调用链路的上下文中传播。
  3. RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖。
  4. TM 向 TC 发起针对 XID 的全局提交或回滚决议。
  5. TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。

FESCAR Architecture


执行过程

Fescar TM执行过程

说明,整个执行流程如下:

  • 1.TransactionalTemplate通过GlobalTransactionContext.getCurrentOrCreate()返回GlobalTransaction对象。
  • 2.GlobalTransactionContext的createNew()方法创建DefaultGlobalTransaction对象。
  • 3.DefaultGlobalTransaction的构造方法当中创建DefaultTransactionManager对象。
  • 4.TransactionalTemplate通过DefaultGlobalTransaction执行begin/commit/rollback等操作。
  • 5.DefaultGlobalTransaction内部通过DefaultTransactionManager执行begin/commit/rollback等操作。


源码解析

public class TransactionalTemplate {

    public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {

        // 1. 创建一个GlobalTransaction对象
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 2. 通过GlobalTransaction开始执行事务
        try {
            tx.begin(business.timeout(), business.name());

        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);

        }
}

说明:

  • 创建tx对象,GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate()。
  • 执行全局事务,tx.beigin()及其他省略的一部分代码。


public class GlobalTransactionContext {

    private static final ThreadLocal<GlobalTransaction> THREAD_TRANSACTION_CONTEXT = new ThreadLocal<>();

    private GlobalTransactionContext() {
    }

    // 创建GlobalTransaction对象
    private static GlobalTransaction createNew() {
        GlobalTransaction tx = new DefaultGlobalTransaction();
        THREAD_TRANSACTION_CONTEXT.set(tx);
        return THREAD_TRANSACTION_CONTEXT.get();
    }

    public static GlobalTransaction getCurrent() {
        GlobalTransaction tx = THREAD_TRANSACTION_CONTEXT.get();
        if (tx != null) {
            return tx;
        }
        String xid = RootContext.getXID();
        if (xid == null) {
            return null;
        }
        tx = new DefaultGlobalTransaction(xid);
        THREAD_TRANSACTION_CONTEXT.set(tx);
        return THREAD_TRANSACTION_CONTEXT.get();
    }

    public static GlobalTransaction getCurrentOrCreate() {
        GlobalTransaction tx = getCurrent();
        if (tx == null) {
            return createNew();
        }
        return tx;
    }
}

说明:

  • createNew()方法创建GlobalTransaction tx对象,类型是DefaultGlobalTransaction。
  • 保存tx到线程当中实现线程隔离,THREAD_TRANSACTION_CONTEXT.set(tx)。
  • GlobalTransaction对象负责执行事务的begin()、commit()、rollback()等方法。


public class DefaultGlobalTransaction implements GlobalTransaction {

    private static final int DEFAULT_GLOBAL_TX_TIMEOUT = 60000;
    private static final String DEFAULT_GLOBAL_TX_NAME = "default";

    private TransactionManager transactionManager;

    private String xid;

    private GlobalStatus status = GlobalStatus.UnKnown;

    private GlobalTransactionRole role = GlobalTransactionRole.Launcher;

    DefaultGlobalTransaction(String xid) {
        this.transactionManager = DefaultTransactionManager.get();
        this.xid = xid;
        if (xid != null) {
            status = GlobalStatus.Begin;
            role = GlobalTransactionRole.Participant;
        }
    }

    @Override
    public void begin(int timeout, String name) throws TransactionException {
        if (xid == null && role == GlobalTransactionRole.Launcher) {
            xid = transactionManager.begin(null, null, name, timeout);
            status = GlobalStatus.Begin;
            RootContext.bind(xid);
        } else {
            if (xid == null) {
                throw new ShouldNeverHappenException(role + " is NOT in a global transaction context.");
            }
            LOGGER.info(role + " is already in global transaction " + xid);
        }

    }

    @Override
    public void commit() throws TransactionException {
        check();
        RootContext.unbind();
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of committing
            return;
        }
        status = transactionManager.commit(xid);

    }

    @Override
    public void rollback() throws TransactionException {
        check();
        RootContext.unbind();
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of committing
            return;
        }
        status = transactionManager.rollback(xid);

    }

    @Override
    public GlobalStatus getStatus() throws TransactionException {
        check();
        status = transactionManager.getStatus(xid);
        return status;
    }
}

说明:

  • DefaultGlobalTransaction构造函数创建transactionManager 对象,this.transactionManager = DefaultTransactionManager.get();
  • DefaultGlobalTransaction的begin/commit/rollback通过TransactionManager的begin/commit/rollback实现。


public class DefaultTransactionManager implements TransactionManager {

    private static class SingletonHolder {
        private static final TransactionManager INSTANCE = new DefaultTransactionManager();
    }

    public static TransactionManager get() {
        return SingletonHolder.INSTANCE;
    }

    private DefaultTransactionManager() {

    }

    @Override
    public String begin(String applicationId, String transactionServiceGroup, 
                        String name, int timeout) throws TransactionException {
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
        return response.getXid();
    }

    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setTransactionId(txId);
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
        globalRollback.setTransactionId(txId);
        GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus getStatus(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
        queryGlobalStatus.setTransactionId(txId);
        GlobalStatusResponse response = (GlobalStatusResponse) syncCall(queryGlobalStatus);
        return response.getGlobalStatus();
    }

    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
            return (AbstractTransactionResponse) TmRpcClient.getInstance().sendMsgWithResponse(request);
        } catch (TimeoutException toe) {
            throw new TransactionException(TransactionExceptionCode.IO, toe);
        }
    }
}

说明:

  • DefaultTransactionManager 是单例实现全局唯一。
  • DefaultTransactionManager 是TM实现begin/commit/rollback的核心逻辑。
  • DefaultTransactionManager 的begin/commit/rollback通过和TC通信实现。
  • DefaultTransactionManager 的syncCall实现和TC通信。
目录
相关文章
|
关系型数据库 FESCAR 数据库
阿里开源分布式事务解决方案 Fescar 全解析
广为人知的阿里分布式事务解决方案:GTS(Global Transaction Service),已正式推出开源版本,取名为“Fescar”,希望帮助业界解决微服务架构下的分布式事务问题,今天我们一起来深入了解。
25996 0
|
FESCAR 应用服务中间件 Dubbo
Fescar example解析 - TM流程
背景  Fescar 是 阿里巴巴 开源的 分布式事务中间件,以 高效 并且对业务0侵入的方式,解决微服务场景下面临的分布式事务问题,介绍可以参考Fescar介绍。  Fescar example解析系列主要是通过阅读Fescar的example源码梳理Fescar的整体逻辑,偏重于整体流程的梳理,让大家在整体上能够有一个宏观了解。
3637 0
|
SQL Dubbo Java
Fescar&Seata分布式事务实现原理解析探秘
前言 fescar发布已有时日,分布式事务一直是业界备受关注的领域,fescar发布一个月左右便受到了近5000个star足以说明其热度。当然,在fescar出来之前,已经有比较成熟的分布式事务的解决方案开源了,比较典型的方案如LCN(https://github.com/codingapi/tx-lcn)的2pc型无侵入事务,目前lcn已发展到5.0,已支持和fescar事务模型类似的TCX型事务。
|
FESCAR
Fescar example解析 - TC流程
开篇  这篇文章主要是梳理TC处理TM发送消息的过程,由于消息种类较多所以打算额外写篇文章分析,这篇文章主要把进入网络层以后的基本流程梳理下,方便大家阅读源码。  这篇文章的没有针对TM的接收部分进行分析,针对收到报文以后的处理流程。
1855 0
|
FESCAR Java
Fescar example解析 - TM发送逻辑
开篇  这篇文章的目的主要是理清楚Fescar的TM发送部分的逻辑,从时序图和源码两个层面进行分析。  文章中间会解答两个自己阅读代码中遇到的困惑(估计大部分人看代码的时候也会遇到这个困惑),包括TmRpcClient的初始化过程和配置加载过程。
1301 0
|
11天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
39 2
|
1月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
70 0
|
1月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
57 0
|
1月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
62 0
|
1月前
|
安全 Java 程序员
Collection-Stack&Queue源码解析
Collection-Stack&Queue源码解析
84 0

推荐镜像

更多
下一篇
无影云桌面