Fescar example解析 - GlobalTransaction

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 开篇  这篇文章是接着Fescar example解析 - TM流程的下一步分析,主要是对TM的处理逻辑的进一步分析,理清楚TM(Transaction Manager )的处理步骤以及代码调用链。  这篇文章的结论是TM执行事务操作包括begin/commit/rollback都是通过DefaultTransactionManager类来实现,实现形式是TM和TC进行网络通信,在整个TM->TC的过程中TM担当了Client端的角色,TC担当了Server端的角色。

开篇

 这篇文章是接着Fescar example解析 - TM流程的下一步分析,主要是对TM的处理逻辑的进一步分析,理清楚TM(Transaction Manager )的处理步骤以及代码调用链。

这篇文章的结论是TM执行事务操作包括begin/commit/rollback都是通过DefaultTransactionManager类来实现,实现形式是TM和TC进行网络通信,在整个TM->TC的过程中TM担当了Client端的角色,TC担当了Server端的角色。


背景介绍

事务资料摘自Fescar概览
与XA 的模型类似,我们定义 3 个组件来协议分布式事务的处理过程。

FESCAR Model

  • Transaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。
  • Transaction Manager (TM): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。
  • Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。

一个典型的分布式事务过程:

  1. TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID。
  2. XID 在微服务调用链路的上下文中传播。
  3. RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖。
  4. TM 向 TC 发起针对 XID 的全局提交或回滚决议。
  5. TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。

FESCAR Architecture


执行过程

Fescar TM执行过程

说明,整个执行流程如下:

  • 1.TransactionalTemplate通过GlobalTransactionContext.getCurrentOrCreate()返回GlobalTransaction对象。
  • 2.GlobalTransactionContext的createNew()方法创建DefaultGlobalTransaction对象。
  • 3.DefaultGlobalTransaction的构造方法当中创建DefaultTransactionManager对象。
  • 4.TransactionalTemplate通过DefaultGlobalTransaction执行begin/commit/rollback等操作。
  • 5.DefaultGlobalTransaction内部通过DefaultTransactionManager执行begin/commit/rollback等操作。


源码解析

public class TransactionalTemplate {

    public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {

        // 1. 创建一个GlobalTransaction对象
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 2. 通过GlobalTransaction开始执行事务
        try {
            tx.begin(business.timeout(), business.name());

        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);

        }
}

说明:

  • 创建tx对象,GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate()。
  • 执行全局事务,tx.beigin()及其他省略的一部分代码。


public class GlobalTransactionContext {

    private static final ThreadLocal<GlobalTransaction> THREAD_TRANSACTION_CONTEXT = new ThreadLocal<>();

    private GlobalTransactionContext() {
    }

    // 创建GlobalTransaction对象
    private static GlobalTransaction createNew() {
        GlobalTransaction tx = new DefaultGlobalTransaction();
        THREAD_TRANSACTION_CONTEXT.set(tx);
        return THREAD_TRANSACTION_CONTEXT.get();
    }

    public static GlobalTransaction getCurrent() {
        GlobalTransaction tx = THREAD_TRANSACTION_CONTEXT.get();
        if (tx != null) {
            return tx;
        }
        String xid = RootContext.getXID();
        if (xid == null) {
            return null;
        }
        tx = new DefaultGlobalTransaction(xid);
        THREAD_TRANSACTION_CONTEXT.set(tx);
        return THREAD_TRANSACTION_CONTEXT.get();
    }

    public static GlobalTransaction getCurrentOrCreate() {
        GlobalTransaction tx = getCurrent();
        if (tx == null) {
            return createNew();
        }
        return tx;
    }
}

说明:

  • createNew()方法创建GlobalTransaction tx对象,类型是DefaultGlobalTransaction。
  • 保存tx到线程当中实现线程隔离,THREAD_TRANSACTION_CONTEXT.set(tx)。
  • GlobalTransaction对象负责执行事务的begin()、commit()、rollback()等方法。


public class DefaultGlobalTransaction implements GlobalTransaction {

    private static final int DEFAULT_GLOBAL_TX_TIMEOUT = 60000;
    private static final String DEFAULT_GLOBAL_TX_NAME = "default";

    private TransactionManager transactionManager;

    private String xid;

    private GlobalStatus status = GlobalStatus.UnKnown;

    private GlobalTransactionRole role = GlobalTransactionRole.Launcher;

    DefaultGlobalTransaction(String xid) {
        this.transactionManager = DefaultTransactionManager.get();
        this.xid = xid;
        if (xid != null) {
            status = GlobalStatus.Begin;
            role = GlobalTransactionRole.Participant;
        }
    }

    @Override
    public void begin(int timeout, String name) throws TransactionException {
        if (xid == null && role == GlobalTransactionRole.Launcher) {
            xid = transactionManager.begin(null, null, name, timeout);
            status = GlobalStatus.Begin;
            RootContext.bind(xid);
        } else {
            if (xid == null) {
                throw new ShouldNeverHappenException(role + " is NOT in a global transaction context.");
            }
            LOGGER.info(role + " is already in global transaction " + xid);
        }

    }

    @Override
    public void commit() throws TransactionException {
        check();
        RootContext.unbind();
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of committing
            return;
        }
        status = transactionManager.commit(xid);

    }

    @Override
    public void rollback() throws TransactionException {
        check();
        RootContext.unbind();
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of committing
            return;
        }
        status = transactionManager.rollback(xid);

    }

    @Override
    public GlobalStatus getStatus() throws TransactionException {
        check();
        status = transactionManager.getStatus(xid);
        return status;
    }
}

说明:

  • DefaultGlobalTransaction构造函数创建transactionManager 对象,this.transactionManager = DefaultTransactionManager.get();
  • DefaultGlobalTransaction的begin/commit/rollback通过TransactionManager的begin/commit/rollback实现。


public class DefaultTransactionManager implements TransactionManager {

    private static class SingletonHolder {
        private static final TransactionManager INSTANCE = new DefaultTransactionManager();
    }

    public static TransactionManager get() {
        return SingletonHolder.INSTANCE;
    }

    private DefaultTransactionManager() {

    }

    @Override
    public String begin(String applicationId, String transactionServiceGroup, 
                        String name, int timeout) throws TransactionException {
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
        return response.getXid();
    }

    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setTransactionId(txId);
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
        globalRollback.setTransactionId(txId);
        GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus getStatus(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
        queryGlobalStatus.setTransactionId(txId);
        GlobalStatusResponse response = (GlobalStatusResponse) syncCall(queryGlobalStatus);
        return response.getGlobalStatus();
    }

    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
            return (AbstractTransactionResponse) TmRpcClient.getInstance().sendMsgWithResponse(request);
        } catch (TimeoutException toe) {
            throw new TransactionException(TransactionExceptionCode.IO, toe);
        }
    }
}

说明:

  • DefaultTransactionManager 是单例实现全局唯一。
  • DefaultTransactionManager 是TM实现begin/commit/rollback的核心逻辑。
  • DefaultTransactionManager 的begin/commit/rollback通过和TC通信实现。
  • DefaultTransactionManager 的syncCall实现和TC通信。
目录
相关文章
|
关系型数据库 FESCAR 数据库
阿里开源分布式事务解决方案 Fescar 全解析
广为人知的阿里分布式事务解决方案:GTS(Global Transaction Service),已正式推出开源版本,取名为“Fescar”,希望帮助业界解决微服务架构下的分布式事务问题,今天我们一起来深入了解。
26020 0
|
FESCAR 应用服务中间件 Dubbo
Fescar example解析 - TM流程
背景  Fescar 是 阿里巴巴 开源的 分布式事务中间件,以 高效 并且对业务0侵入的方式,解决微服务场景下面临的分布式事务问题,介绍可以参考Fescar介绍。  Fescar example解析系列主要是通过阅读Fescar的example源码梳理Fescar的整体逻辑,偏重于整体流程的梳理,让大家在整体上能够有一个宏观了解。
3646 0
|
SQL Dubbo Java
Fescar&Seata分布式事务实现原理解析探秘
前言 fescar发布已有时日,分布式事务一直是业界备受关注的领域,fescar发布一个月左右便受到了近5000个star足以说明其热度。当然,在fescar出来之前,已经有比较成熟的分布式事务的解决方案开源了,比较典型的方案如LCN(https://github.com/codingapi/tx-lcn)的2pc型无侵入事务,目前lcn已发展到5.0,已支持和fescar事务模型类似的TCX型事务。
|
FESCAR
Fescar example解析 - TC流程
开篇  这篇文章主要是梳理TC处理TM发送消息的过程,由于消息种类较多所以打算额外写篇文章分析,这篇文章主要把进入网络层以后的基本流程梳理下,方便大家阅读源码。  这篇文章的没有针对TM的接收部分进行分析,针对收到报文以后的处理流程。
1866 0
|
FESCAR Java
Fescar example解析 - TM发送逻辑
开篇  这篇文章的目的主要是理清楚Fescar的TM发送部分的逻辑,从时序图和源码两个层面进行分析。  文章中间会解答两个自己阅读代码中遇到的困惑(估计大部分人看代码的时候也会遇到这个困惑),包括TmRpcClient的初始化过程和配置加载过程。
1306 0
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
110 2
|
3月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
93 0
|
27天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
27天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
27天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析

热门文章

最新文章

推荐镜像

更多