Fescar TC-beigin流程

简介: 开篇 这篇文章是用来讲解清楚TC(Transaction Coordinator:事务协调器)在处理TM发送过来的begin操作(事务开启操作)。 核心逻辑包括GlobalSession对象的生成、GlobalSession的持久化以及XID生成。

开篇

 这篇文章是用来讲解清楚TC(Transaction Coordinator:事务协调器)在处理TM发送过来的begin操作(事务开启操作)。

 核心逻辑包括GlobalSession对象的生成、GlobalSession的持久化以及XID生成。


TC begin 流程说明

  • 1.创建GlobalSession对象,GlobalSession.createGlobalSession()。
  • 2.添加周期监听器到GlobalSession当中,生命周期对象为DefaultSessionManager。
  • 3.启动GlobalSession的周期监听器,添加GlobalSession对象到全局sessionMap对象。
  • 4.启动GlobalSession的周期监听器,持久化GlobalSession对象。


TC begin 源码分析

public class DefaultCore implements Core {

    @Override
    public String begin(String applicationId, String transactionServiceGroup, 
                        String name, int timeout) 
    throws TransactionException {

        // 创建全局GlobalSession对象
        GlobalSession session = GlobalSession.createGlobalSession(
                applicationId, transactionServiceGroup, name, timeout);

        // 全局GlobalSession对象添加生命周期监听器SessionHolder.getRootSessionManager()
        session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

        // 启动全局Session对象GlobalSession
        session.begin();

        // 返回新生成的XID返回
        return XID.generateXID(session.getTransactionId());
    }
}

说明:

  • GlobalSession.createGlobalSession()创建全局GlobalSession对象。
  • session.addSessionLifecycleListener()给GlobalSession对象添加生命周期监听器。
  • session.begin()方法通过生命周期监听器保存全局GlobalSession对象。
  • sessionHolder.getRootSessionManager()返回DefaultSessionManager对象。
  • XID.generateXID()创建XID值。


public class GlobalSession implements SessionLifecycle, SessionStorable {
    // 生命周期监听器的容器
    private ArrayList<SessionLifecycleListener> lifecycleListeners 
                                              =  new ArrayList<>();

    public static GlobalSession createGlobalSession(String applicationId, 
              String txServiceGroup, String txName, int timeout) {
        GlobalSession session = 
         new GlobalSession(applicationId, txServiceGroup, txName, timeout);
        return session;
    }

    public GlobalSession(String applicationId, String transactionServiceGroup, 
                         String transactionName, int timeout) {
        // 生成transactionId对象。
        this.transactionId = UUIDGenerator.generateUUID();
        this.status = GlobalStatus.Begin;

        this.applicationId = applicationId;
        this.transactionServiceGroup = transactionServiceGroup;
        this.transactionName = transactionName;
        this.timeout = timeout;
    }

    // 添加生命周期监听器
    public void addSessionLifecycleListener(
         SessionLifecycleListener sessionLifecycleListener) {
         lifecycleListeners.add(sessionLifecycleListener);
    }

    public void begin() throws TransactionException {
        this.status = GlobalStatus.Begin;
        this.beginTime = System.currentTimeMillis();
        this.active = true;
        for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
            lifecycleListener.onBegin(this);
        }
    }
}


// 生成TransactionId的类和方法
public class UUIDGenerator {

    private static AtomicLong UUID = new AtomicLong(1000);
    private static int UUID_INTERNAL = 200000000;

    public static long generateUUID() {
        long id = UUID.incrementAndGet();
        if (id > 2000000000) {
            synchronized (UUID) {
                if (UUID.get() >= id) {
                    id -= 2000000000;
                    UUID.set(id);
                }
            }
        }
        return id;
    }
}

说明:

  • GlobalSession构造器内部通过UUIDGenerator.generateUUID()生成transactionId。
  • addSessionLifecycleListener()方法添加生命周期监听器DefaultSessionManager。
  • begin()方法调用生命周期监听器的onBegin()方法(lifecycleListener.onBegin),实现GlobalSession的持久化。


public class SessionHolder {

    private static final String ROOT_SESSION_MANAGER_NAME = "root.data";
    private static final String ASYNC_COMMITTING_SESSION_MANAGER_NAME = "async.commit.data";
    private static final String RETRY_COMMITTING_SESSION_MANAGER_NAME = "retry.commit.data";
    private static final String RETRY_ROLLBACKING_SESSION_MANAGER_NAME = "retry.rollback.data";
    private static SessionManager ROOT_SESSION_MANAGER;
    private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
    private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
    private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;

    public static void init(String sessionStorePath) throws IOException {
        if (sessionStorePath == null) {
            ROOT_SESSION_MANAGER = new DefaultSessionManager(ROOT_SESSION_MANAGER_NAME);
            ASYNC_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(ASYNC_COMMITTING_SESSION_MANAGER_NAME);
            RETRY_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(RETRY_COMMITTING_SESSION_MANAGER_NAME);
            RETRY_ROLLBACKING_SESSION_MANAGER = new DefaultSessionManager(RETRY_ROLLBACKING_SESSION_MANAGER_NAME);
        } else {
            if (!sessionStorePath.endsWith("/")) {
                sessionStorePath = sessionStorePath + "/";
            }
            ROOT_SESSION_MANAGER = new FileBasedSessionManager(ROOT_SESSION_MANAGER_NAME, sessionStorePath);
            ASYNC_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(ASYNC_COMMITTING_SESSION_MANAGER_NAME);
            RETRY_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(RETRY_COMMITTING_SESSION_MANAGER_NAME);
            RETRY_ROLLBACKING_SESSION_MANAGER = new DefaultSessionManager(RETRY_ROLLBACKING_SESSION_MANAGER_NAME);
        }
    }

    public static final SessionManager getRootSessionManager() {
        if (ROOT_SESSION_MANAGER == null) {
            throw new ShouldNeverHappenException("SessionManager is NOT init!");
        }
        return ROOT_SESSION_MANAGER;
    }
}

说明:

  • getRootSessionManager()返回DefaultSessionManager对象,实现生命周期接口。


public class DefaultSessionManager extends AbstractSessionManager {

    public DefaultSessionManager(String name) {
        super(name);
        transactionStoreManager = new TransactionStoreManager() {
            @Override
            public boolean writeSession(LogOperation logOperation, 
                                        SessionStorable session) {
                return false;
            }

            @Override
            public void shutdown() {

            }

            @Override
            public List<TransactionWriteStore> readWriteStoreFromFile(int readSize, 
                boolean isHistory) {
                return null;
            }

            @Override
            public boolean hasRemaining(boolean isHistory) {
                return false;
            }
        };
    }
}

public abstract class AbstractSessionManager 
                      implements SessionManager, SessionLifecycleListener {

    protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractSessionManager.class);

    protected Map<Long, GlobalSession> sessionMap = new ConcurrentHashMap<>();

    protected TransactionStoreManager transactionStoreManager;

    protected String name;

    public AbstractSessionManager(String name) {
        this.name = name;
    }

    @Override
    public void addGlobalSession(GlobalSession session) throws TransactionException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_ADD);
        }
        transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
        sessionMap.put(session.getTransactionId(), session);

    }

    @Override
    public void onBegin(GlobalSession globalSession) throws TransactionException {
        addGlobalSession(globalSession);
    }
}

说明:

  • DefaultSessionManager是GlobalSession的生命周期管理器。
  • DefaultSessionManager的父类AbstractSessionManager实现SessionLifecycleListener接口。
  • DefaultSessionManager的调用父类AbstractSessionManager的onBegin()方法。
  • onBegin()方法内部执行addGlobalSession()方法添加GlobalSession对象。
  • addGlobalSession()方法执行transactionStoreManager.writeSession()执行持久化,自定义的TransactionStoreManager啥都不操作。
  • transactionStoreManager是DefaultSessionManager内生成TransactionStoreManager对象。
  • addGlobalSession()方法执行sessionMap.put()保存GlobalSession对象。
目录
相关文章
|
安全 容器
阿里云最新产品手册——云基础产品与基础设施——计算——弹性容器实例——通用部署ACK虚拟节点组件创建ECI Pot——虚拟节点和弹性容器ECI——安全
阿里云最新产品手册——云基础产品与基础设施——计算——弹性容器实例——通用部署ACK虚拟节点组件创建ECI Pot——虚拟节点和弹性容器ECI——安全自制脑图
286 2
|
20天前
|
人工智能 数据可视化 安全
王炸组合!阿里云 OpenClaw X 飞书 CLI,开启 Agent 基建狂潮!(附带免费使用6个月服务器)
本文详解如何用阿里云Lighthouse一键部署OpenClaw,结合飞书CLI等工具,让AI真正“动手”——自动群发、生成科研日报、整理知识库。核心理念:未来软件应为AI而生,CLI即AI的“手脚”,实现高效、安全、可控的智能自动化。
34888 53
王炸组合!阿里云 OpenClaw X 飞书 CLI,开启 Agent 基建狂潮!(附带免费使用6个月服务器)
|
14天前
|
人工智能 自然语言处理 安全
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)
本文介绍了Claude Code终端AI助手的使用指南,主要内容包括:1)常用命令如版本查看、项目启动和更新;2)三种工作模式切换及界面说明;3)核心功能指令速查表,包含初始化、压缩对话、清除历史等操作;4)详细解析了/init、/help、/clear、/compact、/memory等关键命令的使用场景和语法。文章通过丰富的界面截图和场景示例,帮助开发者快速掌握如何通过命令行和交互界面高效使用Claude Code进行项目开发,特别强调了CLAUDE.md文件作为项目知识库的核心作用。
13606 42
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)
|
10天前
|
人工智能 JavaScript Ubuntu
低成本搭建AIP自动化写作系统:Hermes保姆级使用教程,长文和逐步实操贴图
我带着怀疑的态度,深度使用了几天,聚焦微信公众号AIP自动化写作场景,写出来的几篇文章,几乎没有什么修改,至少合乎我本人的意愿,而且排版风格,也越来越完善,同样是起码过得了我自己这一关。 这个其实OpenClaw早可以实现了,但是目前我觉得最大的区别是,Hermes会自主总结提炼,并更新你的写作技能。 相信就冲这一点,就值得一试。 这篇帖子主要就Hermes部署使用,作一个非常详细的介绍,几乎一步一贴图。 关于Hermes,无论你赞成哪种声音,我希望都是你自己动手行动过,发自内心的选择!
2759 28
|
2天前
|
缓存 人工智能 自然语言处理
我对比了8个Claude API中转站,踩了不少坑,总结给你
本文是个人开发者耗时1周实测的8大Claude中转平台横向评测,聚焦Claude Code真实体验:以加权均价(¥/M token)、内部汇率、缓存支持、模型真实性及稳定性为核心指标。
|
1月前
|
人工智能 JSON 机器人
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
本文带你零成本玩转OpenClaw:学生认证白嫖6个月阿里云服务器,手把手配置飞书机器人、接入免费/高性价比AI模型(NVIDIA/通义),并打造微信公众号“全自动分身”——实时抓热榜、AI选题拆解、一键发布草稿,5分钟完成热点→文章全流程!
45805 158
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
|
5天前
|
弹性计算 人工智能 自然语言处理
阿里云Qwen3.6全新开源,三步完成专有版部署!
Qwen3.6是阿里云全新MoE架构大模型系列,稀疏激活显著降低推理成本,兼顾顶尖性能与高性价比;支持多规格、FP8量化、原生Agent及100+语言,开箱即用。
|
8天前
|
人工智能 弹性计算 安全
Hermes Agent是什么?怎么部署?超详细实操教程
Hermes Agent 是 Nous Research 于2026年2月开源的自进化AI智能体,支持跨会话持久记忆、自动提炼可复用技能、多平台接入与200+模型切换,真正实现“越用越懂你”。MIT协议,部署灵活,隐私可控。
2081 4