Fescar - RM 全局事务提交回滚流程

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 开篇  这篇文章的目的主要是讲解RM在接收TC的请求后执行全局分支事务提交(doBranchCommit)和全局分支事务回滚(doBranchRollback)的流程。  全局的分支事务提交过程和回滚过程也算RM处理流程中核心的一环,了解以后并结合之前讲解的本地事务提交流程就能够较好的理解整个过程了。

开篇

 这篇文章的目的主要是讲解RM在接收TC的请求后执行全局分支事务提交(doBranchCommit)和全局分支事务回滚(doBranchRollback)的流程。

 全局的分支事务提交过程和回滚过程也算RM处理流程中核心的一环,了解以后并结合之前讲解的本地事务提交流程就能够较好的理解整个过程了。


全局事务操作流程

整体流程

public class RMHandlerAT extends AbstractRMHandlerAT implements 
                   RMInboundHandler, TransactionMessageHandler {

    private DataSourceManager dataSourceManager = DataSourceManager.get();

    @Override
    protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)  
       throws TransactionException {
        String xid = request.getXid();
        long branchId = request.getBranchId();
        String resourceId = request.getResourceId();
        String applicationData = request.getApplicationData();
        LOGGER.info("AT Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
        BranchStatus status = dataSourceManager.branchCommit(xid, branchId, resourceId, applicationData);
        response.setBranchStatus(status);
        LOGGER.info("AT Branch commit result: " + status);

    }

    @Override
    protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)  
       throws TransactionException {
        String xid = request.getXid();
        long branchId = request.getBranchId();
        String resourceId = request.getResourceId();
        String applicationData = request.getApplicationData();
        LOGGER.info("AT Branch rolling back: " + xid + " " + branchId + " " + resourceId);
        BranchStatus status = dataSourceManager.branchRollback(xid, branchId, resourceId, applicationData);
        response.setBranchStatus(status);
        LOGGER.info("AT Branch rollback result: " + status);

    }
}

说明:

  • doBranchCommit()通过dataSourceManager.branchCommit()去执行分支事务提交。
  • doBranchRollback()通过dataSourceManager.branchRollback()去执行分支事务回滚。
  • dataSourceManager是DataSourceManager对象。


doBranchCommit流程

public class DataSourceManager implements ResourceManager {

    private ResourceManagerInbound asyncWorker;

    public void setAsyncWorker(ResourceManagerInbound asyncWorker) {
        this.asyncWorker = asyncWorker;
    }

    public BranchStatus branchCommit(String xid, long branchId, String resourceId, String applicationData)  
       throws TransactionException {
        return asyncWorker.branchCommit(xid, branchId, resourceId, applicationData);
    }
}


public class AsyncWorker implements ResourceManagerInbound {

    public BranchStatus branchCommit(String xid, long branchId, String resourceId, String applicationData) 
     throws TransactionException {
        if (ASYNC_COMMIT_BUFFER.size() < ASYNC_COMMIT_BUFFER_LIMIT) {
            ASYNC_COMMIT_BUFFER.add(new Phase2Context(xid, branchId, resourceId, applicationData));
        } else {
            LOGGER.warn("Async commit buffer is FULL. 
            Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later.");
        }
        return BranchStatus.PhaseTwo_Committed;
    }

    public synchronized void init() {
        LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT);
        timerExecutor = new ScheduledThreadPoolExecutor(1,
            new NamedThreadFactory("AsyncWorker", 1, true));
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {

                    doBranchCommits();

                } catch (Throwable e) {
                    LOGGER.info("Failed at async committing ... " + e.getMessage());

                }
            }
        }, 10, 1000 * 1, TimeUnit.MILLISECONDS);
    }



    private void doBranchCommits() {
        if (ASYNC_COMMIT_BUFFER.size() == 0) {
            return;
        }
        Map<String, List<Phase2Context>> mappedContexts = new HashMap<>();
        Iterator<Phase2Context> iterator = ASYNC_COMMIT_BUFFER.iterator();
        while (iterator.hasNext()) {
            Phase2Context commitContext = iterator.next();
            List<Phase2Context> contextsGroupedByResourceId = mappedContexts.get(commitContext.resourceId);
            if (contextsGroupedByResourceId == null) {
                contextsGroupedByResourceId = new ArrayList<>();
                mappedContexts.put(commitContext.resourceId, contextsGroupedByResourceId);
            }
            contextsGroupedByResourceId.add(commitContext);

            iterator.remove();

        }

        for (String resourceId : mappedContexts.keySet()) {
            Connection conn = null;
            try {
                try {
                    DataSourceProxy dataSourceProxy = DataSourceManager.get().get(resourceId);
                    conn = dataSourceProxy.getPlainConnection();
                } catch (SQLException sqle) {
                    LOGGER.warn("Failed to get connection for async committing on " + resourceId, sqle);
                    continue;
                }

                List<Phase2Context> contextsGroupedByResourceId = mappedContexts.get(resourceId);
                for (Phase2Context commitContext : contextsGroupedByResourceId) {
                    try {
                        UndoLogManager.deleteUndoLog(commitContext.xid, commitContext.branchId, conn);
                    } catch (Exception ex) {
                        LOGGER.warn("Failed to delete undo log [" + 
                         commitContext.branchId + "/" + commitContext.xid + "]", ex);
                    }
                }

            } finally {
                if (conn != null) {
                    try {
                        conn.close();
                    } catch (SQLException closeEx) {
                        LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
                    }
                }
            }

        }

    }
}

说明:

  • doBranchCommit()操作的核心实现通过AsyncWorker完成,AsyncWorker类其实是一个生成消费模型。
  • doBranchCommit()把需要提交的任务添加到AsyncWorker的ASYNC_COMMIT_BUFFER队列当中。
  • AsyncWorker内部timerExecutor负责启动执行commit动作线程执行doBranchCommits()动作。
  • doBranchCommits动作内部负责删除多余的UndoLog, UndoLogManager.deleteUndoLog。
  • doBranchCommit()的本质任务就是删除备份的回滚日志而已。


doBranchRollback流程

public class DataSourceManager implements ResourceManager {

    public BranchStatus branchRollback(String xid, long branchId, String resourceId, String applicationData) 
       throws TransactionException {
        DataSourceProxy dataSourceProxy = get(resourceId);
        if (dataSourceProxy == null) {
            throw new ShouldNeverHappenException();
        }
        try {
            // 执行回滚操作
            UndoLogManager.undo(dataSourceProxy, xid, branchId);
        } catch (TransactionException te) {
            if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
            } else {
                return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
            }
        }
        return BranchStatus.PhaseTwo_Rollbacked;

    }
}


public final class UndoLogManager {
    private static String SELECT_UNDO_LOG_SQL = 
    "SELECT * FROM " + UNDO_LOG_TABLE_NAME + " WHERE log_status = 0 AND branch_id = ? AND xid = ? FOR UPDATE";

    public static void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) 
       throws TransactionException {

        assertDbSupport(dataSourceProxy.getTargetDataSource().getDbType());

        Connection conn = null;
        ResultSet rs = null;
        PreparedStatement selectPST = null;
        try {
            conn = dataSourceProxy.getPlainConnection();

            // The entire undo process should run in a local transaction.
            conn.setAutoCommit(false);

            // Find UNDO LOG
            selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
            selectPST.setLong(1, branchId);
            selectPST.setString(2, xid);
            rs = selectPST.executeQuery();

            // 遍历所有回滚日志
            while (rs.next()) {
                Blob b = rs.getBlob("rollback_info");
                String rollbackInfo = StringUtils.blob2string(b);
                BranchUndoLog branchUndoLog = UndoLogParserFactory.getInstance().decode(rollbackInfo);

                for (SQLUndoLog sqlUndoLog : branchUndoLog.getSqlUndoLogs()) {
                    TableMeta tableMeta = TableMetaCache.getTableMeta(dataSourceProxy, sqlUndoLog.getTableName());
                    sqlUndoLog.setTableMeta(tableMeta);
                    AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                                      dataSourceProxy.getDbType(), sqlUndoLog);
                    undoExecutor.executeOn(conn);
                }

            }
            deleteUndoLog(xid, branchId, conn);

            conn.commit();

        } catch (Throwable e) {
            if (conn != null) {
                try {
                    conn.rollback();
                } catch (SQLException rollbackEx) {
                    LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
                }
            }
            throw new TransactionException(BranchRollbackFailed_Retriable, String.format("%s/%s", branchId, xid), e);

        } finally {
            try {
                if (rs != null) {
                    rs.close();
                }
                if (selectPST != null) {
                    selectPST.close();
                }
                if (conn != null) {
                    conn.close();
                }
            } catch (SQLException closeEx) {
                LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
            }
        }
    }
}

说明:

  • doBranchRollback操作的核心实现通过UndoLogManager完成,UndoLogManager.undo()负责执行回滚。
  • undo()操作的核心是通过SELECT_UNDO_LOG_SQL日志去获取回滚日志内容。
  • 根据undoLog对象通过UndoExecutorFactory.getUndoExecutor获取回滚的执行者Executor对象。
  • undoExecutor.executeOn(conn)执行回滚操作,不同的回滚操作对象不同的undoExecutor
  • deleteUndoLog(xid, branchId, conn)执行日志删除操作。


期待

 下篇文章会针对undoExecutor作具体的介绍。


Fescar源码分析连载

Fescar 源码解析系列

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
6月前
|
开发者
seata事务问题之不回滚客户端如何解决
Seata是一款开源的分布式事务解决方案,旨在提供高效且无缝的分布式事务服务;在集成和使用Seata过程中,开发者可能会遇到不同的异常问题,本合集针对Seata常见异常进行系统整理,为开发者提供详细的问题分析和解决方案,助力高效解决分布式事务中的难题。
423 11
|
3月前
|
数据库 微服务
GTS事务执行过程
【8月更文挑战第25天】
50 4
|
5月前
|
Apache 开发者
Apache Seata 如何解决 TCC 模式的幂等、悬挂和空回滚问题
【6月更文挑战第8天】Apache Seata 是一款分布式事务框架,解决TCC模式下的幂等、悬挂和空回滚问题。通过记录事务状态处理幂等,设置超时机制避免悬挂,明确标记Try操作成功来处理空回滚。Seata 提供丰富配置和管理功能,确保分布式事务的可靠性和效率,支持复杂事务处理场景,为企业业务发展提供支持。
219 7
|
SQL 数据库
源码解析Seata AT模式中分支事务的提交或回滚是如何被触发的
源码解析Seata AT模式中分支事务的提交或回滚是如何被触发的
342 0
源码解析Seata AT模式中分支事务的提交或回滚是如何被触发的
|
SQL 关系型数据库 MySQL
阿里 Seata 新版本终于解决了 TCC 模式的幂等、悬挂和空回滚问题
阿里 Seata 新版本终于解决了 TCC 模式的幂等、悬挂和空回滚问题
405 0
阿里 Seata 新版本终于解决了 TCC 模式的幂等、悬挂和空回滚问题
|
开发工具 git
回滚代码
回滚代码
313 0
回滚代码
|
SQL Cloud Native NoSQL
分布式事务Seata源码解析九:分支事务如何注册到全局事务
分布式事务Seata源码解析九:分支事务如何注册到全局事务
844 0
分布式事务Seata源码解析九:分支事务如何注册到全局事务
RM在seata AT模式中如何实现分支事务提交或回滚
RM在seata AT模式中如何实现分支事务提交或回滚
455 0
|
XML Java 中间件
Seata 分支事务
前面,我们已经介绍了 Seata 的整体设计思想,接下来我们深入到其实现细节中,本文先来介绍 Seata 中分支事务的整体实现思想。
|
SQL druid 关系型数据库
Seata AT 分支事务
前面,我们已经介绍了 Seata 的整体设计思想,接下来我们深入到其实现细节中,本文介绍 Seata 中 AT 模式分支事务的实现。