Fescar TC-beigin流程

简介: 开篇 这篇文章是用来讲解清楚TC(Transaction Coordinator:事务协调器)在处理TM发送过来的begin操作(事务开启操作)。 核心逻辑包括GlobalSession对象的生成、GlobalSession的持久化以及XID生成。

开篇

 这篇文章是用来讲解清楚TC(Transaction Coordinator:事务协调器)在处理TM发送过来的begin操作(事务开启操作)。

 核心逻辑包括GlobalSession对象的生成、GlobalSession的持久化以及XID生成。


TC begin 流程说明

  • 1.创建GlobalSession对象,GlobalSession.createGlobalSession()。
  • 2.添加周期监听器到GlobalSession当中,生命周期对象为DefaultSessionManager。
  • 3.启动GlobalSession的周期监听器,添加GlobalSession对象到全局sessionMap对象。
  • 4.启动GlobalSession的周期监听器,持久化GlobalSession对象。


TC begin 源码分析

public class DefaultCore implements Core {

    @Override
    public String begin(String applicationId, String transactionServiceGroup, 
                        String name, int timeout) 
    throws TransactionException {

        // 创建全局GlobalSession对象
        GlobalSession session = GlobalSession.createGlobalSession(
                applicationId, transactionServiceGroup, name, timeout);

        // 全局GlobalSession对象添加生命周期监听器SessionHolder.getRootSessionManager()
        session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

        // 启动全局Session对象GlobalSession
        session.begin();

        // 返回新生成的XID返回
        return XID.generateXID(session.getTransactionId());
    }
}

说明:

  • GlobalSession.createGlobalSession()创建全局GlobalSession对象。
  • session.addSessionLifecycleListener()给GlobalSession对象添加生命周期监听器。
  • session.begin()方法通过生命周期监听器保存全局GlobalSession对象。
  • sessionHolder.getRootSessionManager()返回DefaultSessionManager对象。
  • XID.generateXID()创建XID值。


public class GlobalSession implements SessionLifecycle, SessionStorable {
    // 生命周期监听器的容器
    private ArrayList<SessionLifecycleListener> lifecycleListeners 
                                              =  new ArrayList<>();

    public static GlobalSession createGlobalSession(String applicationId, 
              String txServiceGroup, String txName, int timeout) {
        GlobalSession session = 
         new GlobalSession(applicationId, txServiceGroup, txName, timeout);
        return session;
    }

    public GlobalSession(String applicationId, String transactionServiceGroup, 
                         String transactionName, int timeout) {
        // 生成transactionId对象。
        this.transactionId = UUIDGenerator.generateUUID();
        this.status = GlobalStatus.Begin;

        this.applicationId = applicationId;
        this.transactionServiceGroup = transactionServiceGroup;
        this.transactionName = transactionName;
        this.timeout = timeout;
    }

    // 添加生命周期监听器
    public void addSessionLifecycleListener(
         SessionLifecycleListener sessionLifecycleListener) {
         lifecycleListeners.add(sessionLifecycleListener);
    }

    public void begin() throws TransactionException {
        this.status = GlobalStatus.Begin;
        this.beginTime = System.currentTimeMillis();
        this.active = true;
        for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
            lifecycleListener.onBegin(this);
        }
    }
}


// 生成TransactionId的类和方法
public class UUIDGenerator {

    private static AtomicLong UUID = new AtomicLong(1000);
    private static int UUID_INTERNAL = 200000000;

    public static long generateUUID() {
        long id = UUID.incrementAndGet();
        if (id > 2000000000) {
            synchronized (UUID) {
                if (UUID.get() >= id) {
                    id -= 2000000000;
                    UUID.set(id);
                }
            }
        }
        return id;
    }
}

说明:

  • GlobalSession构造器内部通过UUIDGenerator.generateUUID()生成transactionId。
  • addSessionLifecycleListener()方法添加生命周期监听器DefaultSessionManager。
  • begin()方法调用生命周期监听器的onBegin()方法(lifecycleListener.onBegin),实现GlobalSession的持久化。


public class SessionHolder {

    private static final String ROOT_SESSION_MANAGER_NAME = "root.data";
    private static final String ASYNC_COMMITTING_SESSION_MANAGER_NAME = "async.commit.data";
    private static final String RETRY_COMMITTING_SESSION_MANAGER_NAME = "retry.commit.data";
    private static final String RETRY_ROLLBACKING_SESSION_MANAGER_NAME = "retry.rollback.data";
    private static SessionManager ROOT_SESSION_MANAGER;
    private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
    private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
    private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;

    public static void init(String sessionStorePath) throws IOException {
        if (sessionStorePath == null) {
            ROOT_SESSION_MANAGER = new DefaultSessionManager(ROOT_SESSION_MANAGER_NAME);
            ASYNC_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(ASYNC_COMMITTING_SESSION_MANAGER_NAME);
            RETRY_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(RETRY_COMMITTING_SESSION_MANAGER_NAME);
            RETRY_ROLLBACKING_SESSION_MANAGER = new DefaultSessionManager(RETRY_ROLLBACKING_SESSION_MANAGER_NAME);
        } else {
            if (!sessionStorePath.endsWith("/")) {
                sessionStorePath = sessionStorePath + "/";
            }
            ROOT_SESSION_MANAGER = new FileBasedSessionManager(ROOT_SESSION_MANAGER_NAME, sessionStorePath);
            ASYNC_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(ASYNC_COMMITTING_SESSION_MANAGER_NAME);
            RETRY_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(RETRY_COMMITTING_SESSION_MANAGER_NAME);
            RETRY_ROLLBACKING_SESSION_MANAGER = new DefaultSessionManager(RETRY_ROLLBACKING_SESSION_MANAGER_NAME);
        }
    }

    public static final SessionManager getRootSessionManager() {
        if (ROOT_SESSION_MANAGER == null) {
            throw new ShouldNeverHappenException("SessionManager is NOT init!");
        }
        return ROOT_SESSION_MANAGER;
    }
}

说明:

  • getRootSessionManager()返回DefaultSessionManager对象,实现生命周期接口。


public class DefaultSessionManager extends AbstractSessionManager {

    public DefaultSessionManager(String name) {
        super(name);
        transactionStoreManager = new TransactionStoreManager() {
            @Override
            public boolean writeSession(LogOperation logOperation, 
                                        SessionStorable session) {
                return false;
            }

            @Override
            public void shutdown() {

            }

            @Override
            public List<TransactionWriteStore> readWriteStoreFromFile(int readSize, 
                boolean isHistory) {
                return null;
            }

            @Override
            public boolean hasRemaining(boolean isHistory) {
                return false;
            }
        };
    }
}

public abstract class AbstractSessionManager 
                      implements SessionManager, SessionLifecycleListener {

    protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractSessionManager.class);

    protected Map<Long, GlobalSession> sessionMap = new ConcurrentHashMap<>();

    protected TransactionStoreManager transactionStoreManager;

    protected String name;

    public AbstractSessionManager(String name) {
        this.name = name;
    }

    @Override
    public void addGlobalSession(GlobalSession session) throws TransactionException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_ADD);
        }
        transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
        sessionMap.put(session.getTransactionId(), session);

    }

    @Override
    public void onBegin(GlobalSession globalSession) throws TransactionException {
        addGlobalSession(globalSession);
    }
}

说明:

  • DefaultSessionManager是GlobalSession的生命周期管理器。
  • DefaultSessionManager的父类AbstractSessionManager实现SessionLifecycleListener接口。
  • DefaultSessionManager的调用父类AbstractSessionManager的onBegin()方法。
  • onBegin()方法内部执行addGlobalSession()方法添加GlobalSession对象。
  • addGlobalSession()方法执行transactionStoreManager.writeSession()执行持久化,自定义的TransactionStoreManager啥都不操作。
  • transactionStoreManager是DefaultSessionManager内生成TransactionStoreManager对象。
  • addGlobalSession()方法执行sessionMap.put()保存GlobalSession对象。
目录
相关文章
|
6月前
|
Dubbo 关系型数据库 MySQL
Seata常见问题之serviceA方法无法注册分支事务到Seata如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
|
中间件 Java 调度
Seata两阶段提交AT模式详解
Seata两阶段提交AT模式详解
642 0
Seata两阶段提交AT模式详解
|
SQL JSON Java
Seata分布式事务模式(TA、TCC、XA、SAGA)工作机制
分布式应用有一个比较明显的问题就是,一个业务流程通常需要几个服务来完成,业务的一致性很难保证。为了保障业务一致性,每一步都要在 catch 里去处理前面所有的“回滚”操作,可读性及维护性差,开发效率低下。
459 0
|
中间件 uml
阿里中间件seata源码剖析六:TCC模式中2阶段提交实现
阿里中间件seata源码剖析六:TCC模式中2阶段提交实现
387 7
阿里中间件seata源码剖析六:TCC模式中2阶段提交实现
|
SQL 缓存 关系型数据库
阿里中间件seata源码剖析四:AT模式2阶段提交
阿里中间件seata源码剖析四:AT模式2阶段提交
319 9
阿里中间件seata源码剖析四:AT模式2阶段提交
|
SQL JSON 算法
【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?
【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?
750 1
【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?
|
SQL Cloud Native Java
分布式事务Seata源码解析八:本地事务执行流程(AT模式下)
分布式事务Seata源码解析八:本地事务执行流程(AT模式下)
695 0
分布式事务Seata源码解析八:本地事务执行流程(AT模式下)
|
SQL 数据库
源码解析Seata AT模式中分支事务的提交或回滚是如何被触发的
源码解析Seata AT模式中分支事务的提交或回滚是如何被触发的
343 0
源码解析Seata AT模式中分支事务的提交或回滚是如何被触发的
|
SQL Cloud Native NoSQL
分布式事务Seata源码解析九:分支事务如何注册到全局事务
分布式事务Seata源码解析九:分支事务如何注册到全局事务
845 0
分布式事务Seata源码解析九:分支事务如何注册到全局事务
RM在seata AT模式中如何实现分支事务提交或回滚
RM在seata AT模式中如何实现分支事务提交或回滚
457 0