Fescar TC-commit流程

简介: 开篇 这篇文章的目的主要是讲解Fescar TC执行commit的流程,目的是讲解清楚commit流程中的一些步骤。 遗憾的是因为commit本身Fescar的分支事务注册上报,如果事先不了解Fescar的分支事务,有些逻辑理解起来会有一些奇怪,对于branchSession本身还未了解,所以只能单独讲解commit流程。

开篇

 这篇文章的目的主要是讲解Fescar TC执行commit的流程,目的是讲解清楚commit流程中的一些步骤。

 遗憾的是因为commit本身Fescar的分支事务注册上报,如果事先不了解Fescar的分支事务,有些逻辑理解起来会有一些奇怪,对于branchSession本身还未了解,所以只能单独讲解commit流程。


背景

Fescar事务管理
说明:

  • 分支事务中数据的 本地锁 由本地事务管理,在分支事务 Phase1 结束时释放。
    同时,随着本地事务结束,连接 也得以释放。
  • 分支事务中数据的 全局锁 在事务协调器侧管理,在决议 Phase2 全局提交时,全局锁马上可以释放。只有在决议全局回滚的情况下,全局锁 才被持有至分支的 Phase2 结束。

这个设计,极大地减少了分支事务对资源(数据和连接)的锁定时间,给整体并发和吞吐的提升提供了基础。

这里需要重点指出的是:Phase1阶段的commit()操作是各个分支事务本地的事务操作。Phase2阶段的操作是全局的commit()和rollback()。TC-commit流程指的就是Phase2阶段。


TC commit流程介绍

  • 1.根据transactionId查找begin阶段生成的GlobalSession对象。
  • 2.对GlobalSession对象进行清理操作,删除分支事务的锁并清理GlobalSession对象。
  • 3.TC通知所有RM(各分支事务的资源管理器)进行全局提交操作(doGlobalCommit)。


TC commit源码分析

public class DefaultCoordinator extends AbstractTCInboundHandler
    implements TransactionMessageHandler, ResourceManagerInbound {

    @Override
    protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
        throws TransactionException {
        response.setGlobalStatus(core.commit(XID.generateXID(request.getTransactionId())));
    }
}

说明:

  • DefaultCoordinator的doGlobalCommit()作为全局回滚入口
  • core.commit()根据XID去执行全局commit()操作。


Commit 主流程

public class DefaultCore implements Core {
    public GlobalStatus commit(String xid) throws TransactionException {
         // 1.查找GlobalSession
        GlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid));
        
        if (globalSession == null) {
            return GlobalStatus.Finished;
        }
        
        GlobalStatus status = globalSession.getStatus();
        // 2.关闭全局session并执行清理工作
        globalSession.closeAndClean(); // Highlight: Firstly, close the session, then no more branch can be registered.

        // 3.执行GlobalCommit通知动作
        if (status == GlobalStatus.Begin) {
            if (globalSession.canBeCommittedAsync()) {
                asyncCommit(globalSession);
            } else {
                doGlobalCommit(globalSession, false);
            }

        }

        // 返回GlobalCommit后的状态
        return globalSession.getStatus();
    }
}

说明:

  • DefaultCore是全局回滚的核心逻辑。
  • SessionHolder.findGlobalSession查找全局的GlobalSession对象。
  • GlobalSession执行closeAndClean操作。
  • DefaultCore执行doGlobalCommit通知TC执行全局回滚操作。


查找GlobalSession

public class SessionHolder {
    public static GlobalSession findGlobalSession(Long transactionId) throws TransactionException {
        return getRootSessionManager().findGlobalSession(transactionId);
    }
}

public class DefaultSessionManager extends AbstractSessionManager {}

public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {

    protected Map<Long, GlobalSession> sessionMap = new ConcurrentHashMap<>();

    public GlobalSession findGlobalSession(Long transactionId) throws TransactionException {
        return sessionMap.get(transactionId);
    }
}

说明:

  • findGlobalSession()方法从DefaultSessionManager当中获取GlobalSession。
  • DefaultSessionManager的父类AbstractSessionManager的findGlobalSession从sessionMap获取GlobalSession对象。


GlobalSession的closeAndClean

public class GlobalSession implements SessionLifecycle, SessionStorable {

    public void closeAndClean() throws TransactionException {
        close();
        clean();
    }

    public void close() throws TransactionException {
        if (active) {
            for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
                lifecycleListener.onClose(this);
            }
        }
    }

    private void clean() throws TransactionException {
        for (BranchSession branchSession : branchSessions) {
            branchSession.unlock();
        }
    }
}



public class DefaultSessionManager extends AbstractSessionManager {}
public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {

    public void onClose(GlobalSession globalSession) throws TransactionException {
        globalSession.setActive(false);
    }
}

说明:

  • GlobalSession的执行closeAndClean操作,先执行close再执行clean。
  • lifecycleListener.onClose()执行DefaultSessionManager的onClose()。
  • DefaultSessionManager的onClose()把设置active标识为false。
  • clean()操作对所有的分支事务branchSession释放锁。这部分逻辑比较复杂单独列出。


BranchSession的unlock

public class BranchSession implements Lockable, Comparable<BranchSession>, SessionStorable {

    public boolean unlock() throws TransactionException {
        if (lockHolder.size() == 0) {
            return true;
        }
        Iterator<Map.Entry<Map<String, Long>, Set<String>>> it = lockHolder.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Map<String, Long>, Set<String>> entry = it.next();
            Map<String, Long> bucket = entry.getKey();
            Set<String> keys = entry.getValue();
            synchronized (bucket) {
                for (String key : keys) {
                    Long v = bucket.get(key);
                    if (v == null) {
                        continue;
                    }
                    if (v.longValue() == getTransactionId()) {
                        bucket.remove(key);
                    }
                }
            }
        }
        lockHolder.clear();
        return true;
    }
}

说明:

  • BranchSession的unlock()操作对BranchSession 进行清理。
  • BranchSession内部的数据由于暂未阅读该部分代码所以暂时不能解释清楚。
  • 全局清除lockHolder。


TC执行GlobalCommit

public class DefaultCore implements Core {

    public void doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
        // 遍历所有的BranchSession执行回滚操作
        for (BranchSession branchSession : globalSession.getSortedBranches()) {
            BranchStatus currentStatus = branchSession.getStatus();
            if (currentStatus == BranchStatus.PhaseOne_Failed) {
                continue;
            }
            try {
                BranchStatus branchStatus = resourceManagerInbound.branchCommit(XID.generateXID(
                                            branchSession.getTransactionId()), branchSession.getBranchId(),
                    branchSession.getResourceId(), branchSession.getApplicationData());

                switch (branchStatus) {
                    case PhaseTwo_Committed:
                        globalSession.removeBranch(branchSession);
                        continue;
                    case PhaseTwo_CommitFailed_Unretryable:
                        if (globalSession.canBeCommittedAsync()) {
                            LOGGER.error("By [{}], failed to commit branch {}", branchStatus, branchSession);
                            continue;
                        } else {
                            globalSession.changeStatus(GlobalStatus.CommitFailed);
                            globalSession.end();
                            LOGGER.error("Finally, failed to commit global[{}] since branch[{}] commit failed",
                                globalSession.getTransactionId(), branchSession.getBranchId());
                            return;
                        }
                    default:
                        if (!retrying) {
                            queueToRetryCommit(globalSession);
                            return;
                        }
                        if (globalSession.canBeCommittedAsync()) {
                            LOGGER.error("By [{}], failed to commit branch {}", branchStatus, branchSession);
                            continue;
                        } else {
                            LOGGER.error(
                                "Failed to commit global[{}] since branch[{}] commit failed, will retry later.",
                                globalSession.getTransactionId(), branchSession.getBranchId());
                            return;
                        }

                }

            } catch (Exception ex) {
                LOGGER.info("Exception committing branch {}", branchSession, ex);
                if (!retrying) {
                    queueToRetryCommit(globalSession);
                    if (ex instanceof TransactionException) {
                        throw (TransactionException) ex;
                    } else {
                        throw new TransactionException(ex);
                    }
                }

            }

        }
        if (globalSession.hasBranch()) {
            return;
        }
        globalSession.changeStatus(GlobalStatus.Committed);
        globalSession.end();
    }
}

说明:

  • 对所有的BranchSession执行branchCommit通知
  • 针对branchCommit返回状态进行判断,有一些逻辑在里面,后续阅读了Branch相关资料后再补充状态转移图。
目录
相关文章
|
4月前
|
4月前
|
监控 关系型数据库 PostgreSQL
两阶段提交(2PC, Two-Phase Commit)
【8月更文挑战第24天】
330 9
|
5月前
|
中间件
|
5月前
|
中间件 数据库
|
7月前
|
存储 Java Nacos
Seata常见问题之xa模式出现错误xid is not valid如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
214 4
|
中间件 Java 调度
Seata两阶段提交AT模式详解
Seata两阶段提交AT模式详解
643 0
Seata两阶段提交AT模式详解
|
SQL JSON Java
Seata分布式事务模式(TA、TCC、XA、SAGA)工作机制
分布式应用有一个比较明显的问题就是,一个业务流程通常需要几个服务来完成,业务的一致性很难保证。为了保障业务一致性,每一步都要在 catch 里去处理前面所有的“回滚”操作,可读性及维护性差,开发效率低下。
459 0
|
SQL 数据库
源码解析Seata AT模式中分支事务的提交或回滚是如何被触发的
源码解析Seata AT模式中分支事务的提交或回滚是如何被触发的
343 0
源码解析Seata AT模式中分支事务的提交或回滚是如何被触发的
|
SQL Cloud Native NoSQL
分布式事务Seata源码解析九:分支事务如何注册到全局事务
分布式事务Seata源码解析九:分支事务如何注册到全局事务
845 0
分布式事务Seata源码解析九:分支事务如何注册到全局事务
RM在seata AT模式中如何实现分支事务提交或回滚
RM在seata AT模式中如何实现分支事务提交或回滚
461 0