Fescar TC-rollback流程

简介: 开篇 这篇文章的目的主要是讲解Fescar TC执行rollback的流程,目的是讲解清楚rollback流程中的一些步骤。 遗憾的是因为rollback本身涉及Fescar的分支事务注册上报,如果事先不了解Fescar的分支事务,有些逻辑理解起来会有一些奇怪,对于branchSession本身还未了解,所以只能单独讲解rollback流程。

开篇

 这篇文章的目的主要是讲解Fescar TC执行rollback的流程,目的是讲解清楚rollback流程中的一些步骤。

 遗憾的是因为rollback本身涉及Fescar的分支事务注册上报,如果事先不了解Fescar的分支事务,有些逻辑理解起来会有一些奇怪,对于branchSession本身还未了解,所以只能单独讲解rollback流程。


背景

Fescar事务管理
说明:

  • 分支事务中数据的 本地锁 由本地事务管理,在分支事务 Phase1 结束时释放。
    同时,随着本地事务结束,连接 也得以释放。
  • 分支事务中数据的 全局锁 在事务协调器侧管理,在决议 Phase2 全局提交时,全局锁马上可以释放。只有在决议全局回滚的情况下,全局锁 才被持有至分支的 Phase2 结束。

这个设计,极大地减少了分支事务对资源(数据和连接)的锁定时间,给整体并发和吞吐的提升提供了基础。

这里需要重点指出的是:Phase1阶段的commit()操作是各个分支事务本地的事务操作。Phase2阶段的操作是全局的commit()和rollback()。TC-rollback流程指的就是Phase2阶段。


TC rollback流程介绍

rollback主流程

  • 1.根据transactionId查找begin阶段生成的GlobalSession对象。
  • 2.对GlobalSession对象进行close操作。
  • 3.TC通知所有RM(各分支事务的资源管理器)进行全局回滚操作(doGlobalRollback)。


TC rollback源码分析

public class DefaultCore implements Core {
    public GlobalStatus rollback(String xid) throws TransactionException {
        // 查找全局GlobalSession对象
        GlobalSession globalSession = 
         SessionHolder.findGlobalSession(XID.getTransactionId(xid));

        if (globalSession == null) {
            return GlobalStatus.Finished;
        }
        GlobalStatus status = globalSession.getStatus();
        // Highlight: Firstly, close the session, 
        // then no more branch can be registered.
    
        // 关闭全局的GlobalSession对象。
        globalSession.close(); 

        if (status == GlobalStatus.Begin) {
            globalSession.changeStatus(GlobalStatus.Rollbacking);
            // 执行全局的rollback操作
            doGlobalRollback(globalSession, false);

        }
        return globalSession.getStatus();
    }
}

说明:

  • 查找全局的GlobalSession对象。
  • 关闭GlobalSession对象。
  • 执行全局的rollback操作。


GlobalSession关闭操作

public class GlobalSession implements SessionLifecycle, SessionStorable {

    public void close() throws TransactionException {
        if (active) {
            for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
                lifecycleListener.onClose(this);
            }
        }
    }

    public void onEnd(GlobalSession globalSession) throws TransactionException {
        removeGlobalSession(globalSession);
    }

    public void onStatusChange(GlobalSession globalSession, GlobalStatus status) throws TransactionException {
        updateGlobalSessionStatus(globalSession, status);
    }

    public void updateGlobalSessionStatus(GlobalSession session, GlobalStatus status) throws TransactionException {
        transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
    }

    public void changeStatus(GlobalStatus status) throws TransactionException {
        for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
            lifecycleListener.onStatusChange(this, status);
        }
        this.status = status;
    }

}
  • GlobalSession的close操作调用生命周期监听器lifecycleListener.onClose()。
  • lifecycleListener指的是DefaultSessionManager对象。


DefaultSessionManager操作

public class SessionHolder {
    public static GlobalSession findGlobalSession(Long transactionId) 
      throws TransactionException {
        return getRootSessionManager().findGlobalSession(transactionId);
    }
}

public class DefaultSessionManager extends AbstractSessionManager {}

public abstract class AbstractSessionManager implements 
             SessionManager, SessionLifecycleListener {

    protected Map<Long, GlobalSession> sessionMap = new ConcurrentHashMap<>();

    public GlobalSession findGlobalSession(Long transactionId) 
      throws TransactionException {
        return sessionMap.get(transactionId);
    }

    public void onClose(GlobalSession globalSession) throws TransactionException {
        globalSession.setActive(false);

    }

    public void removeGlobalSession(GlobalSession session) 
                throws TransactionException {
        transactionStoreManager.writeSession(LogOperation.GLOBAL_REMOVE, session);
        sessionMap.remove(session.getTransactionId());

    }
}

说明:

  • 生命周期监听器的onClose()设置GlobalSession对象的active状态为false。
  • findGlobalSession()方法从DefaultSessionManager返回GlobalSession对象。


public class DefaultCore implements Core {

   public void doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
        for (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
            BranchStatus currentBranchStatus = branchSession.getStatus();
            if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
                continue;
            }
            try {
                BranchStatus branchStatus = resourceManagerInbound.branchRollback(XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),
                    branchSession.getResourceId(), branchSession.getApplicationData());

                switch (branchStatus) {
                    case PhaseTwo_Rollbacked:
                        globalSession.removeBranch(branchSession);
                        LOGGER.error("Successfully rolled back branch " + branchSession);
                        continue;
                    case PhaseTwo_RollbackFailed_Unretryable:
                        changeToRollbackFailedStatus(globalSession);
                        globalSession.end();
                        LOGGER.error("Failed to rollback global[" + globalSession.getTransactionId() + "] since branch[" + branchSession.getBranchId() + "] rollback failed");
                        return;
                    default:
                        LOGGER.info("Failed to rollback branch " + branchSession);
                        if (!retrying) {
                            queueToRetryRollback(globalSession);
                        }
                        return;

                }
            } catch (Exception ex) {
                LOGGER.info("Exception rollbacking branch " + branchSession, ex);
                if (!retrying) {
                    queueToRetryRollback(globalSession);
                    if (ex instanceof TransactionException) {
                        throw (TransactionException) ex;
                    } else {
                        throw new TransactionException(ex);
                    }
                }

            }

        }
        if (globalSession.hasBranch()) {
            changeToRollbackFailedStatus(globalSession);
        } else {
            changeToRollbackedStatus(globalSession);
        }
        globalSession.end();
    }
}

说明:

  • doGlobalRollback()遍历GlobalSession当中所有的branchSession执行回滚操作。
  • 内部涉及到GlobalSession的状态迁移,这部分后面统一通过状态迁移实现。
目录
相关文章
|
13天前
|
人工智能 自然语言处理 文字识别
阿里云百炼Qwen3.7-Max简介:能力、优势、支持订阅计划参考
Qwen3.7-Max是阿里云百炼面向智能体时代推出的新一代旗舰模型,对标GPT-5.5、Claude Opus 4.7等闭源旗舰。该模型支持百万级token上下文窗口,具备顶级推理能力、多模态搜索与视觉理解增强、流式输出低延迟响应等核心优势,覆盖编程、办公、长周期自主执行等复杂场景。同时支持OpenAI接口兼容,便于系统快速迁移。用户可通过Token Plan团队或节省计划等订阅方式灵活调用,适合企业级高要求场景使用。
5215 27
阿里云百炼Qwen3.7-Max简介:能力、优势、支持订阅计划参考
|
8天前
|
存储 定位技术 数据库
CodeGraph 如何让 Claude Code减少 7 成工具调用?
CodeGraph 为 Coding Agent 提供本地代码知识图谱,把函数、类、调用链和框架路由提前整理成“项目地图”,减少盲目搜索和文件读取。它不是新 Agent,而是上下文基础设施,让 Agent 更快找到正确代码路径,平均减少 7 成工具调用。
1033 0
|
15天前
|
人工智能 自然语言处理 供应链
|
5天前
|
人工智能 安全 定位技术
CodeGraph深度解析 让Claude Code工具调用直降七成的核心原理与实操教程
如今以Claude Code为代表的AI编程智能体已经成为开发者日常编码、项目重构、漏洞修复的必备工具。但在长期使用过程中,几乎所有开发者都会遇到同一个明显痛点:AI虽然具备强大的代码生成与分析能力,却常常陷入盲目探索的循环中。
704 1
|
21天前
|
人工智能 开发工具 iOS开发
Claude Code 新手完全上手指南:安装、国产模型配置与常用命令全解
Claude Code 是一款运行在终端环境中的 AI 编程助手,能够直接在命令行中完成代码生成、项目分析、文件修改、命令执行、Git 管理等开发全流程工作。它最大的特点是**任务驱动、终端原生、轻量高效、多模型兼容**,无需图形界面、不依赖 IDE 插件,能够深度融入开发者日常工作流。
3774 15
|
17天前
|
人工智能 Linux BI
国内用 Claude Code 终于不用翻墙了:一行命令搞定,自动接 DeepSeek
JeecgBoot AI专题研究 一键脚本:Claude Code + JeecgBoot Skills + DeepSeek 全平台接入 一行命令装好 Claude Code + JeecgBoot Skills + DeepSeek 接入,无需翻墙使用 Claude Code,支持 Wind
3433 10
国内用 Claude Code 终于不用翻墙了:一行命令搞定,自动接 DeepSeek