源码解析Seata AT模式中分支事务的提交或回滚是如何被触发的

简介: 源码解析Seata AT模式中分支事务的提交或回滚是如何被触发的

前言

在之前的博客中,已经介绍过了TM在seata AT模式中的处理流程TC在seata分布式事务中的主要任务以及RM在seata AT模式中的sql语句执行流程,下面我们通过源码讲述分布式事务是如何实现提交或回滚的。

分支事务的提交或回滚

在seata AT模式中,只有当所有的分支事务全部成功提交后,才会触发分布式事务的提交:

public class ConnectionProxy extends AbstractConnectionProxy {
    private void processGlobalTransactionCommit() throws SQLException {
        try {
            // 注册分支事务,添加行锁,其实就是分布式锁
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
          // 插入undolog
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            // 提交本地事务
            targetConnection.commit();
        } catch (Throwable ex) {
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            // 上报分支事务提交失败状态
            report(false);
            // 抛出异常,最终会被TM捕捉到,触发分布式事务的回滚
            throw new SQLException(ex);
        }
        if (IS_REPORT_SUCCESS_ENABLE) {
            // 上报分支事务成功提交状态
            report(true);
        }
        // 恢复现场
        context.reset();
    }
}
复制代码

在上述源码分析中,当所有的RM分支事务提交成功后,TC会接收到所有RM分支事务的状态,代码最终会执行到TM的模版逻辑中。

TM触发分布式事务的提交或回滚


image.png

TM模版代码:

public Object execute(TransactionalExecutor business) throws Throwable {
            // 1. 拿到整理好的@GlobalTransactional注解里面的配置信息
            TransactionInfo txInfo = business.getTransactionInfo();
            if (txInfo == null) {
                throw new ShouldNeverHappenException("transactionInfo does not exist");
            }
            // 1.1 获取当前的分布式事务,如果为null的话,说明这是分布式事务的发起者;如果不为null,说明这是分布式事务的参与者
            GlobalTransaction tx = GlobalTransactionContext.getCurrent();
            // 1.2 获取分布式事务的传播级别,其实就是按照spring的传播级别来一套,区别就是spring事务是本地事务,这是分布式事务,原理都一样
            Propagation propagation = txInfo.getPropagation();
            SuspendedResourcesHolder suspendedResourcesHolder = null;
            try {
                // 这个switch里面全都是处理分布式事务传播级别的
                switch (propagation) {
                    // 如果不支持分布式事务,如果当前存在事务,那么先挂起当前的分布式事务,再执行业务逻辑
                    case NOT_SUPPORTED:
                        // 分布式事务存在,先挂起
                        if (existingTransaction(tx)) {
                            suspendedResourcesHolder = tx.suspend();
                        }
                        // 执行业务逻辑
                        return business.execute();
                    // 如果是每次都要创建一个新的分布式事务,先把当前存在的分布式事务挂起,然后创建一个新分布式事务
                    case REQUIRES_NEW:
                        // 如果分布式事务存在,先挂起当前分布式事务,再创建一个新的分布式事务
                        if (existingTransaction(tx)) {
                            suspendedResourcesHolder = tx.suspend();
                            tx = GlobalTransactionContext.createNew();
                        }
                        // 之所以用break,是为了后面的代码和其他的传播级别一起共用,业务逻辑肯定还是要执行的
                        break;
                    // 如果支持分布式事务,如果当前不存在分布式事务,那么直接执行业务逻辑,否则以分布式事务的方式执行业务逻辑
                    case SUPPORTS:
                        // 如果不存在分布式事务,直接执行业务逻辑
                        if (notExistingTransaction(tx)) {
                            return business.execute();
                        }
                        // 否则,以分布式事务的方式执行业务逻辑
                        break;
                    // 如果有分布式事务,就在当前分布式事务下执行业务逻辑,否则创建一个新的分布式事务执行业务逻辑
                    case REQUIRED:
                        // If current transaction is existing, execute with current transaction,
                        // else continue and execute with new transaction.
                        break;
                    // 如果不允许有分布式事务,那么一旦发现存在分布式事务,直接抛异常;只有不存在分布式事务的时候才正常执行
                    case NEVER:
                        // 存在分布式事务,抛异常
                        if (existingTransaction(tx)) {
                            throw new TransactionException(
                                String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                        , tx.getXid()));
                        } else {
                            // 不存在分布式事务,执行业务逻辑
                            return business.execute();
                        }
                    // 一定要有分布式事务,分布式事务不存在的话,抛异常;
                    case MANDATORY:
                        // 不存在分布式事务,抛异常
                        if (notExistingTransaction(tx)) {
                            throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                        }
                        // Continue and execute with current transaction.
                        break;
                    default:
                        throw new TransactionException("Not Supported Propagation:" + propagation);
                }
                // 上面的传播级别的逻辑处理完毕,下面就是公共的处理逻辑
                // 1.3 如果当前分布式事务没有的话,那么我们就要创建新的分布式事务,此时我们就是分布式事务的发起者,也就是TM本身,否则不能称之为`TM`
                if (tx == null) {
                    tx = GlobalTransactionContext.createNew();
                }
                // 开始准备干活的条件
                // 把我们这个方法的全局锁配置放进当前线程中,并且把线程中已有的全局锁的配置取出来
                // 我们在干完自己的活后,会把这个取出来的配置放回去的
                GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
                try {
                    // 2. 如果我们是分布式事务的发起者的话,那么我们会和TC通信,并且拿到一个XID;如果我们不是分布式事务的发起者的话,那么这一步啥也不干
                    // 这个XID可以从RootContext中获取
                    beginTransaction(txInfo, tx);
                    Object rs;
                    try {
                        // 执行业务逻辑
                        rs = business.execute();
                    } catch (Throwable ex) {
                        // 3. 发生任何异常,我们准备启动回滚机制
                        completeTransactionAfterThrowing(txInfo, tx, ex);
                        throw ex;
                    }
                    // 4. 一切顺利,通知提交分布式事务
                    commitTransaction(tx);
                    return rs;
                } finally {
                    //5. 恢复现场,把之前的配置放回去
                    resumeGlobalLockConfig(previousConfig);
                    // 触发回调
                    triggerAfterCompletion();
                    // 清理工作
                    cleanUp();
                }
            } finally {
                // 恢复之前挂起的事务
                if (suspendedResourcesHolder != null) {
                    tx.resume(suspendedResourcesHolder);
                }
            }
        }
复制代码

主要关注这一段代码:

try {
                    // 2. 如果我们是分布式事务的发起者的话,那么我们会和TC通信,并且拿到一个XID;如果我们不是分布式事务的发起者的话,那么这一步啥也不干
                    // 这个XID可以从RootContext中获取
                    beginTransaction(txInfo, tx);
                    Object rs;
                    try {
                        // 执行业务逻辑
                        rs = business.execute();
                    } catch (Throwable ex) {
                        // 3. 发生任何异常,我们准备启动回滚机制
                        completeTransactionAfterThrowing(txInfo, tx, ex);
                        throw ex;
                    }
                    // 4. 一切顺利,通知提交分布式事务
                    commitTransaction(tx);
                    return rs;
                } finally {
                    //5. 恢复现场,把之前的配置放回去
                    resumeGlobalLockConfig(previousConfig);
                    // 触发回调
                    triggerAfterCompletion();
                    // 清理工作
                    cleanUp();
                }
            } finally {
                // 恢复之前挂起的事务
                if (suspendedResourcesHolder != null) {
                    tx.resume(suspendedResourcesHolder);
                }
            }
复制代码

当任意RM分支事务产生异常后,都会触发TM执行completeTransactionAfterThrowing(txInfo, tx, ex),只有当所有的RM分支事务提交成功后,TM才会发起commitTransaction(tx)

  • 异常回滚
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
        // 如果异常类型和指定的类型一致,那么发起回滚;不一致还是要提交分布式事务
        if (txInfo != null && txInfo.rollbackOn(originalException)) {
            try {
                // 回滚分布式事务
                rollbackTransaction(tx, originalException);
            } catch (TransactionException txe) {
                // 回滚失败抛异常
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                        TransactionalExecutor.Code.RollbackFailure, originalException);
            }
        } else {
            // 不是指定的异常类型,还是继续提交分布式事务
            commitTransaction(tx);
        }
    }
private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
        // 执行回调,默认空回调
        triggerBeforeRollback();
        // 回滚
        tx.rollback();
        // 执行回调,默认空回调
        triggerAfterRollback();
        // 就算回滚没问题,照样抛异常,目的应该是告知开发人员此处产生了回滚
        throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
            ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
    }
复制代码
@Override
    public void rollback() throws TransactionException {
        // 如果是分布式事务参与者,那么啥也不做,RM的回滚不在这里,这是TM的回滚
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of rollback
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNotNull();
        // 下面就是一个循环重试发起分布式事务回滚
        int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;
        try {
            while (retry > 0) {
                try {
                    retry--;
                    // 发起回滚的核心代码
                    status = transactionManager.rollback(xid);
                    // 回滚成功跳出循环
                    break;
                } catch (Throwable ex) {
                    LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                    // 重试失败次数完成才会跳出循环
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global rollback", ex);
                    }
                }
            }
        } finally {
            // 如果回滚的分布式事务就是当前的分布式事务,那么从当前线程中解绑XID
            if (xid.equals(RootContext.getXID())) {
                suspend();
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[{}] rollback status: {}", xid, status);
        }
    }
@Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        // 准备发起请求给TC,回滚指定的分布式事务
        GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
        globalRollback.setXid(xid);
        GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
        return response.getGlobalStatus();
    }
复制代码

分布式事务回滚逻辑中有以下几个点:

  1. 触发回滚需要产生的异常和注解中指定的异常一致才会发起回滚,否则还是继续提交;
  2. 回滚是可以设置重试次数的,只有重试都失败了,才会导致回滚失败,否则只要有一次成功,那么回滚就成功;
  3. TM发起的回滚其实只是和TC发起一次分布式事务回滚的通信,并没有数据库的操作;
  • 分布式事务提交
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
            try {
                // 回调,默认空回调
                triggerBeforeCommit();
                // 分布式事务提交
                tx.commit();
                // 回调,默认空回调
                triggerAfterCommit();
            } catch (TransactionException txe) {
                // 4.1 提交出异常,提交失败
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.CommitFailure);
            }
        }
    @Override
        public void commit() throws TransactionException {
            // 如果只是分布式事务参与者,那么啥也不干,TM只能有一个,哈哈
            if (role == GlobalTransactionRole.Participant) {
                // Participant has no responsibility of committing
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
                }
                return;
            }
            assertXIDNotNull();
            // 分布式事务提交也是有重试的
            int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
            try {
                while (retry > 0) {
                    try {
                        retry--;
                        // 发起分布式事务提交
                        status = transactionManager.commit(xid);
                        // 提交成功跳出循环
                        break;
                    } catch (Throwable ex) {
                        LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                        // 重试结束,依然失败就抛异常
                        if (retry == 0) {
                            throw new TransactionException("Failed to report global commit", ex);
                        }
                    }
                }
            } finally {
                // 如果提交的分布式事务就是当前事务,那么需要清理当前线程中的XID
                if (xid.equals(RootContext.getXID())) {
                    suspend();
                }
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("[{}] commit status: {}", xid, status);
            }
        }
    @Override
        public GlobalStatus commit(String xid) throws TransactionException {
            // 发起分布式事务提交请求,这是与TC通信
            GlobalCommitRequest globalCommit = new GlobalCommitRequest();
            globalCommit.setXid(xid);
            GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
            return response.getGlobalStatus();
        }
复制代码
  1. 分布式事务回滚也是可以设置重试次数的;
  2. 分布式事务提交其实也是TM与TC进行通信,告知TC这个XID对应的分布式事务可以提交了;

TC发起分布式事务提交或回滚

TC通过io.seata.server.coordinator.DefaultCore发起分布式事务提交:

@Override
    public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
        boolean success = true;
        // start committing event
        MetricsPublisher.postSessionDoingEvent(globalSession, retrying);
        if (globalSession.isSaga()) {
            success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
        } else {
            // 直接从这里开始看,这里有一个forEach循环遍历分布式事务中的所有分支事务
            Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
                // if not retrying, skip the canBeCommittedAsync branches
                if (!retrying && branchSession.canBeCommittedAsync()) {
                    return CONTINUE;
                }
                BranchStatus currentStatus = branchSession.getStatus();
                if (currentStatus == BranchStatus.PhaseOne_Failed) {
                    SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                    return CONTINUE;
                }
                // 前面都是一些状态的校验,下面开始发起分支事务的提交
                try {
                    BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
                    // 下面代码可忽略
                    if (isXaerNotaTimeout(globalSession,branchStatus)) {
                        LOGGER.info("Commit branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                        branchStatus = BranchStatus.PhaseTwo_Committed;
                    }
                    switch (branchStatus) {
                        // 如果该分支事务已经提交,说明提交成功了,那么移除该分支
                        case PhaseTwo_Committed:
                            SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                            return CONTINUE;
                        // 如果该分支事务已经提交失败,并且无法重试,那么打上标记
                        case PhaseTwo_CommitFailed_Unretryable:
                            //not at branch
                            SessionHelper.endCommitFailed(globalSession, retrying);
                            LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
                            return false;
                        default:
                            // 进入重试队列
                            if (!retrying) {
                                globalSession.queueToRetryCommit();
                                return false;
                            }
                            // 如果可以异步提交,那就直接跳过,定时任务会执行
                            if (globalSession.canBeCommittedAsync()) {
                                LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",
                                    branchSession.getBranchId(), branchStatus);
                                return CONTINUE;
                            } else {
                                // 剩下的就是提交失败的
                                LOGGER.error(
                                    "Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
                                return false;
                            }
                    }
                } catch (Exception ex) {
                    StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",
                        new String[] {branchSession.toString()});
                    if (!retrying) {
                        // 出了异常,放入队列继续等待提交
                        globalSession.queueToRetryCommit();
                        // 抛出异常给TM
                        throw new TransactionException(ex);
                    }
                }
                return CONTINUE;
            });
            // Return if the result is not null
            if (result != null) {
                return result;
            }
            //If has branch and not all remaining branches can be committed asynchronously,
            //do print log and return false
            if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
                LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
                return false;
            }
            if (!retrying) {
                // 设置分布式事务状态为提交成功
                globalSession.setStatus(GlobalStatus.Committed);
            }
        }
        // if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is
        // executed to improve concurrency performance, and the global transaction ends..
        if (success && globalSession.getBranchSessions().isEmpty()) {
            SessionHelper.endCommitted(globalSession, retrying);
            LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
        }
        return success;
    }
复制代码

也就是说,TC在收到TM的提交信号后,会RPC逐个调用RM执行分支事务的提交:

public abstract class AbstractRMHandler extends AbstractExceptionHandler
    implements RMInboundHandler, TransactionMessageHandler {
  // RM收到分支事务提交请求
  @Override
    public BranchCommitResponse handle(BranchCommitRequest request) {
        BranchCommitResponse response = new BranchCommitResponse();
        // 模版模式
        exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
            @Override
            public void execute(BranchCommitRequest request, BranchCommitResponse response)
                throws TransactionException {
                // 执行分支事务提交
                doBranchCommit(request, response);
            }
        }, request, response);
        return response;
    }
  protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
        throws TransactionException {
        String xid = request.getXid();
        long branchId = request.getBranchId();
        String resourceId = request.getResourceId();
        String applicationData = request.getApplicationData();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
        }
        // 执行分支事务提交
        BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
            applicationData);
        // 返回响应结果
        response.setXid(xid);
        response.setBranchId(branchId);
        response.setBranchStatus(status);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch commit result: " + status);
        }
    }
}
复制代码

根据上述代码,分布式事务的回滚其实也是TC接到TM的回滚信号后,通过RPC依次给RM发请求,触发各分支事务的回滚。

其次就是TC里面的异步提交或回滚了,在博客TC在seata分布式事务中的主要任务中,TC服务里面有多个定时任务,该定时任务就是通过定时检查各分布式事务的状态来判断是否执行提交或回滚的。所以分布式事务的提交与回滚还分为异步方式和同步方式。

小结

根据上述源码分析,我们可以知道,分支事务的提交或回滚,是直接由TC服务触发,TM间接触发的;步骤可简单分解为以下几步:

1.RM执行分支事务;

  • 1.1:分支事务执行成功;
  • 1.2:分支事务执行失败;

2.TM管理分布式事务的提交或回滚;

  • 2.1:当任意RM分支事务执行出现异常,TM通过RPC向TC服务发起分布式事务的回滚;
  • 2.2:只有所有RM都成功提交分支事务,TM通过RPC向TC服务发起分布式事务的提交;

3.TC服务依次向所有RM发起提交或回滚;

  • 3.1:TC接收到TM的提交请求,查询出所有的分支RM,并依次向所有的RM发起提交请求;
  • 3.2:TC接收到TM的回滚请求,查询所有的分支RM,依次向所有RM发起回滚请求;

4.RM执行分支事务的提交或回滚;

  • 4.1:RM收到TC的提交请求,提交分支事务;
  • 4.2:RM收到TC的回滚请求,回滚分支事务;


相关文章
|
2天前
|
缓存 Java 开发者
10个点介绍SpringBoot3工作流程与核心组件源码解析
Spring Boot 是Java开发中100%会使用到的框架,开发者不仅要熟练使用,对其中的核心源码也要了解,正所谓知其然知其所以然,V 哥建议小伙伴们在学习的过程中,一定要去研读一下源码,这有助于你在开发中游刃有余。欢迎一起交流学习心得,一起成长。
|
3天前
|
消息中间件 缓存 前端开发
Netty消息编码及发送源码解析
Netty消息编码及发送源码解析
6 0
|
6天前
|
XML 人工智能 Java
Spring Bean名称生成规则(含源码解析、自定义Spring Bean名称方式)
Spring Bean名称生成规则(含源码解析、自定义Spring Bean名称方式)
|
14天前
yolo-world 源码解析(六)(2)
yolo-world 源码解析(六)
45 0
|
14天前
yolo-world 源码解析(六)(1)
yolo-world 源码解析(六)
44 0
|
15天前
yolo-world 源码解析(五)(4)
yolo-world 源码解析(五)
47 0
|
15天前
yolo-world 源码解析(五)(1)
yolo-world 源码解析(五)
61 0
|
15天前
yolo-world 源码解析(二)(2)
yolo-world 源码解析(二)
58 0
|
15天前
Marker 源码解析(二)(3)
Marker 源码解析(二)
19 0

推荐镜像

更多