RM在seata AT模式中如何实现分支事务提交或回滚

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: RM在seata AT模式中如何实现分支事务提交或回滚

前言

在之前的博客中,我们已经知道了,RM分支事务的提交或回滚是由TC服务下发的指令触发的。

  • 第一种情况:当任意RM的业务处理出现异常,都会触发TM发起全局事务的回滚,相关的回滚指令由TM下发给TC,最终TC把回滚指令依次下发给所有的RM,通过所有分支事务的回滚达到全局事务回滚的目的;
  • 第二种情况:当所有RM都成功提交分支事务后,TM发起全局事务提交指令给到TC服务,TC收到指令后,同样会依次调用所有RM发起分支事务的提交,以便达到全局事务提交的目的;

这篇博客的目的就是介绍RM是如何提交或回滚分支事务的。

TC与RM间的通信

因为RM与TC之间的通信也是基于Netty实现,所以TC下发的提交请求必然是需要对应的处理器来处理的:

public class DefaultRMHandler extends AbstractRMHandler {
  // 接收TC下发的提交请求
  @Override
    public BranchCommitResponse handle(BranchCommitRequest request) {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));
        return getRMHandler(request.getBranchType()).handle(request);
    }
    // 接收TC下发的回滚请求
    @Override
    public BranchRollbackResponse handle(BranchRollbackRequest request) {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));
        return getRMHandler(request.getBranchType()).handle(request);
    }
}
// 上面的提交或回滚请求,最终会调用到AbstractRMHandler里面的逻辑
public abstract class AbstractRMHandler extends AbstractExceptionHandler
    implements RMInboundHandler, TransactionMessageHandler {
  // 下面使用到了最熟悉的模版模式
  @Override
    public BranchCommitResponse handle(BranchCommitRequest request) {
        BranchCommitResponse response = new BranchCommitResponse();
        // 模版模式
        exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
            @Override
            public void execute(BranchCommitRequest request, BranchCommitResponse response)
                throws TransactionException {
                doBranchCommit(request, response);
            }
        }, request, response);
        return response;
    }
  // 回滚也是一样,使用模版模式
    @Override
    public BranchRollbackResponse handle(BranchRollbackRequest request) {
        BranchRollbackResponse response = new BranchRollbackResponse();
        exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {
            @Override
            public void execute(BranchRollbackRequest request, BranchRollbackResponse response)
                throws TransactionException {
                doBranchRollback(request, response);
            }
        }, request, response);
        return response;
    }
}
复制代码

根据上面的源码可知,提交或回滚请求进来后,通过一系列的调用,最终到达了AbstractRMHandler类的处理逻辑中,最核心的逻辑在doBranchCommit(request, response)doBranchRollback(request, response)中;

RM分支事务提交

AbstractRMHandler.doBranchCommit(request, response)

protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
        throws TransactionException {
        // 拆解请求参数
        String xid = request.getXid();
        long branchId = request.getBranchId();
        String resourceId = request.getResourceId();
        String applicationData = request.getApplicationData();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
        }
        // 执行分支事务提交
        BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
            applicationData);
        // 返回响应结果
        response.setXid(xid);
        response.setBranchId(branchId);
        response.setBranchStatus(status);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch commit result: " + status);
        }
    }
复制代码

在上面的代码中,只做了三件事情:

1.先把请求参数拆解出来;

2.调用相应的ResourceManager提交分支事务;

3.返回响应结果;

我们再深入看一下RM是如何提交分支事务的:

public class DataSourceManager extends AbstractResourceManager {
  @Override
    public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                     String applicationData) throws TransactionException {
        // 异步提交
        return asyncWorker.branchCommit(xid, branchId, resourceId);
    }
}
复制代码

为什么说上面的asyncWorker.branchCommit是异步提交呢?因为在asyncWorker中有一个定时任务,每秒中从阻塞队列中取需要提交的分支事务,并完成分支事务的提交;asyncWorker.branchCommit的逻辑只是把需要提交的分支事务放进阻塞队列中;

public class AsyncWorker {
  public AsyncWorker(DataSourceManager dataSourceManager) {
        this.dataSourceManager = dataSourceManager;
        LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);
        commitQueue = new LinkedBlockingQueue<>(ASYNC_COMMIT_BUFFER_LIMIT);
        ThreadFactory threadFactory = new NamedThreadFactory("AsyncWorker", 2, true);
        scheduledExecutor = new ScheduledThreadPoolExecutor(2, threadFactory);
        // 创建定时任务,每秒中调用doBranchCommitSafely提交分支事务
        scheduledExecutor.scheduleAtFixedRate(this::doBranchCommitSafely, 10, 1000, TimeUnit.MILLISECONDS);
    }
  // 异步提交分支事务
    public BranchStatus branchCommit(String xid, long branchId, String resourceId) {
        Phase2Context context = new Phase2Context(xid, branchId, resourceId);
        // 把分支事务放进阻塞队列中
        addToCommitQueue(context);
        return BranchStatus.PhaseTwo_Committed;
    }
}
复制代码

所以,真正的提交逻辑在doBranchCommitSafely中:

void doBranchCommitSafely() {
        try {
            // 提交分支事务
            doBranchCommit();
        } catch (Throwable e) {
            LOGGER.error("Exception occur when doing branch commit", e);
        }
    }
    // 真正干活的代码
    private void doBranchCommit() {
        // 如果阻塞队列中没有需要提交的分支事务,那么直接返回
        if (commitQueue.isEmpty()) {
            return;
        }
        // 这里把阻塞队列中的数据放到了allContexts中,属于CopyOnWrite的思想,下面就可以专注操作allContexts
        List<Phase2Context> allContexts = new LinkedList<>();
        commitQueue.drainTo(allContexts);
        // 通过resourceId对Phase2Context进行分组
        Map<String, List<Phase2Context>> groupedContexts = groupedByResourceId(allContexts);
        // 循环调用dealWithGroupedContexts
        groupedContexts.forEach(this::dealWithGroupedContexts);
    }
private void dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts) {
        if (StringUtils.isBlank(resourceId)) {
            //ConcurrentHashMap required notNull key
            LOGGER.warn("resourceId is empty and will skip.");
            return;
        }
        // 通过resourceId获取DataSourceProxy
        DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId);
        if (dataSourceProxy == null) {
            LOGGER.warn("failed to find resource for {} and requeue", resourceId);
            // 如果没拿到DataSourceProxy,重新放进阻塞队列中,等待下次提交
            addAllToCommitQueue(contexts);
            return;
        }
        Connection conn = null;
        try {
            // 获取原生Connection,这里就不需要代理Connection了
            conn = dataSourceProxy.getPlainConnection();
            UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());
            // 按照每次最多1000个Phase2Context来分割所有的Phase2Context
            List<List<Phase2Context>> splitByLimit = Lists.partition(contexts, UNDOLOG_DELETE_LIMIT_SIZE);
            // 删除该分支事务对应的所有的undolog
            for (List<Phase2Context> partition : splitByLimit) {
                deleteUndoLog(conn, undoLogManager, partition);
            }
        } catch (SQLException sqlExx) {
            //如果出了异常,那么重新回到阻塞队列中,等待下次提交
            addAllToCommitQueue(contexts);
            LOGGER.error("failed to get connection for async committing on {} and requeue", resourceId, sqlExx);
        } finally {
            // 释放链接
            IOUtil.close(conn);
        }
    }
// 真正的删除undolog逻辑
private void deleteUndoLog(final Connection conn, UndoLogManager undoLogManager, List<Phase2Context> contexts) {
        Set<String> xids = new LinkedHashSet<>(contexts.size());
        Set<Long> branchIds = new LinkedHashSet<>(contexts.size());
        contexts.forEach(context -> {
            xids.add(context.xid);
            branchIds.add(context.branchId);
        });
        try {
            // 构建删除undolog的sql语句,并执行
            undoLogManager.batchDeleteUndoLog(xids, branchIds, conn);
            if (!conn.getAutoCommit()) {
                // 提交删除结果
                conn.commit();
            }
        } catch (SQLException e) {
            LOGGER.error("Failed to batch delete undo log", e);
            try {
                // 出异常,就回滚,并重新加入待提交队列
                conn.rollback();
                addAllToCommitQueue(contexts);
            } catch (SQLException rollbackEx) {
                LOGGER.error("Failed to rollback JDBC resource after deleting undo log failed", rollbackEx);
            }
        }
    }
复制代码

所以,综上所属,分支事务的提交主要有以下两点:

  • 分支事务的提交其实是定时任务异步提交的,RM中的asyncWorker每秒中会对阻塞队列中的待提交分支事务进行提交;
  • 分支事务的提交其实就是简单地删除对应的undolog日志即可;

RM分支事务回滚

AbstractRMHandler.doBranchRollback(request, response)

/**
     * Do branch rollback.
     *
     * @param request  the request
     * @param response the response
     * @throws TransactionException the transaction exception
     */
    protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
        throws TransactionException {
        String xid = request.getXid();
        long branchId = request.getBranchId();
        String resourceId = request.getResourceId();
        String applicationData = request.getApplicationData();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);
        }
        BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
            applicationData);
        response.setXid(xid);
        response.setBranchId(branchId);
        response.setBranchStatus(status);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch Rollbacked result: " + status);
        }
    }
复制代码

同样也是分以下三步:

1.拆解回滚请求参数;

2.调用ResourceManager执行分支事务的回滚;

3.返回响应结果;

public class DataSourceManager extends AbstractResourceManager {
  // 分支事务回滚
  @Override
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                       String applicationData) throws TransactionException {
        // 获取DataSourceProxy
        DataSourceProxy dataSourceProxy = get(resourceId);
        if (dataSourceProxy == null) {
            throw new ShouldNeverHappenException(String.format("resource: %s not found",resourceId));
        }
        try {
          // 回滚主要逻辑在undo()里面
            UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
        } catch (TransactionException te) {
            StackTraceLogger.info(LOGGER, te,
                "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
                new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
            // 回滚失败,直接返回,并告知是否可以重试
            if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
            } else {
                return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
            }
        }
        // 回滚成功
        return BranchStatus.PhaseTwo_Rollbacked;
    }
}
复制代码

所以,我们应该继续深入undo()逻辑里面,看看具体是如何回滚的;

@Override
    public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
        Connection conn = null;
        ResultSet rs = null;
        PreparedStatement selectPST = null;
        boolean originalAutoCommit = true;
        // 回滚的逻辑使用了死循环,直至回滚成功或失败
        for (; ; ) {
            try {
                // DataSourceProxy主要是为了拿到原生Connection
                conn = dataSourceProxy.getPlainConnection();
                // 改成手动提交
                if (originalAutoCommit = conn.getAutoCommit()) {
                    conn.setAutoCommit(false);
                }
                // 查询undolog
                selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
                selectPST.setLong(1, branchId);
                selectPST.setString(2, xid);
                rs = selectPST.executeQuery();
                boolean exists = false;
                // 判断是否有undolog
                while (rs.next()) {
                    // 进入while循环,说明一定有undolog
                    exists = true;
                    int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
                    // 判断当前undolog是否可以操作
                    if (!canUndo(state)) {
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
                        }
                        // 不可用操作就返回
                        return;
                    }
                    String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
                    Map<String, String> context = parseContext(contextString);
                    byte[] rollbackInfo = getRollbackInfo(rs);
                    String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                    UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
                        : UndoLogParserFactory.getInstance(serializer);
                    // 最终通过一系列的解析,最终得到BranchUndoLog实体对象
                    BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
                    try {
                        // put serializer name to local
                        setCurrentSerializer(parser.getName());
                        List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                        if (sqlUndoLogs.size() > 1) {
                            Collections.reverse(sqlUndoLogs);
                        }
                        // 循环执行回滚操作
                        for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                            TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
                                conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
                            sqlUndoLog.setTableMeta(tableMeta);
                            AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                dataSourceProxy.getDbType(), sqlUndoLog);
                            // 回滚逻辑主要在这里面
                            undoExecutor.executeOn(conn);
                        }
                    } finally {
                        // remove serializer name
                        removeCurrentSerializer();
                    }
                }
                // 回滚完成后,删除undolog,并提交事务
                if (exists) {
                    deleteUndoLog(xid, branchId, conn);
                    conn.commit();
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,
                            State.GlobalFinished.name());
                    }
                } else {
                    // 如果没有找到undolog日志,说明可能在分支事务注册的时候,undolog还没来得及添加,因为超时或者其他分支事务异常触发了TM发起全局事务回滚,所以这里立马插入一条记录,以防undolog被RM插入
                    insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
                    conn.commit();
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,
                            State.GlobalFinished.name());
                    }
                }
                return;
            } catch (SQLIntegrityConstraintViolationException e) {
                // 产生SQLIntegrityConstraintViolationException异常后,将重新循环执行回滚
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);
                }
            } catch (Throwable e) {
                // 如果产生其他异常,那么将回滚前面的操作,并抛出异常跳出循环
                if (conn != null) {
                    try {
                        conn.rollback();
                    } catch (SQLException rollbackEx) {
                        LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
                    }
                }
                throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
                    .format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
                        branchId, e.getMessage()), e);
            } finally {
                // 恢复现场
                try {
                    if (rs != null) {
                        rs.close();
                    }
                    if (selectPST != null) {
                        selectPST.close();
                    }
                    if (conn != null) {
                        if (originalAutoCommit) {
                            conn.setAutoCommit(true);
                        }
                        conn.close();
                    }
                } catch (SQLException closeEx) {
                    LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
                }
            }
        }
    }
复制代码

在分支事务的回滚逻辑中,我们还需要额外考虑分支事务注册后,还没来得及插入undolog的情况;

  • 如果找到undolog,那么执行回滚操作;
  • 如果没有undolog,那么先插入一条记录标记,以防undolog被插入;如果标记插入失败,说明undolog已经被其他线程插入成功了,那么重新循环执行回滚操作;
public void executeOn(Connection conn) throws SQLException {
        // 如果开启校验undo日志,并且数据校验失败,不能执行回滚
        // 主要目的是为了确认当前数据和后镜像是否一致,否则就是数据被污染了,不能回滚
        if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(conn)) {
            return;
        }
        // 下面才是真正的回滚逻辑
        PreparedStatement undoPST = null;
        try {
            // 构建undoSQL,这里用到了策略模式
            String undoSQL = buildUndoSQL();
            // 获取对应的prepareStatement
            undoPST = conn.prepareStatement(undoSQL);
            // 构建反向sql
            TableRecords undoRows = getUndoRows();
            for (Row undoRow : undoRows.getRows()) {
                ArrayList<Field> undoValues = new ArrayList<>();
                List<Field> pkValueList = getOrderedPkList(undoRows, undoRow, getDbType(conn));
                for (Field field : undoRow.getFields()) {
                    if (field.getKeyType() != KeyType.PRIMARY_KEY) {
                        undoValues.add(field);
                    }
                }
                // 设置参数
                undoPrepare(undoPST, undoValues, pkValueList);
                // 执行回滚
                undoPST.executeUpdate();
            }
        } catch (Exception ex) {
            if (ex instanceof SQLException) {
                throw (SQLException) ex;
            } else {
                throw new SQLException(ex);
            }
        }
        finally {
            //important for oracle
            IOUtil.close(undoPST);
        }
    }
复制代码

分支事务的回滚其实就是根据后前后镜像构建反向sql的原理实现的,在这个回滚的逻辑中,需要注意以下几点:

  • 分支事务的回滚不是异步的;
  • 在回滚的过程中,默认是需要校验数据是否被污染;在有完全把握的情况下,开发人员也可以自己配置成回滚不校验以提升效率

小结

根据上述源码分析,我们可以简单归纳为以下几点:

1.分支事务的提交是异步完成的;

2.分支事务的提交只需要删除undolog日志;

3.分支事务的回滚与提交不同,它不是异步的;

4.回滚需要考虑是否存在undolog,以及如何防止undolog被其他线程插入,如果被其他线程插入该如何处理;这里需要结合分支事务的注册逻辑来看;seata使用的方式是在回滚的逻辑中,如果没有发现undolog,为了避免被其他线程插入,自己先插入一条标记占位;如果标记插入失败,说明undolog刚刚被其他线程插入成功了,那么重新循环执行回滚逻辑;

5.在回滚过程中,有一个校验数据是否被污染的逻辑,这里可以让开发人员配置是否需要校验,默认情况下是需要校验afterImage是否和当前数据一致,否则标志着数据已经被污染了,就不能执行回滚了;

6.回滚原理其实就是根据前后镜像生成反向sql,用来还原之前的数据;



相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
27天前
Seata框架在AT模式下是如何保证数据一致性的?
通过以上这些机制的协同作用,Seata 在 AT 模式下能够有效地保证数据的一致性,确保分布式事务的可靠执行。你还可以进一步深入研究 Seata 的具体实现细节,以更好地理解其数据一致性保障的原理。
37 3
|
2月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
6月前
|
Java 微服务 Spring
Seata 客户端需要同时启动 TM 和 RM 吗?
Seata 客户端需要同时启动 TM 和 RM 吗?
|
7月前
|
Dubbo 关系型数据库 MySQL
Seata常见问题之serviceA方法无法注册分支事务到Seata如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
|
7月前
|
监控 API 数据库
Seata常见问题之Seata AT的设计不支持使用临时表如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
|
7月前
|
SQL 监控 Java
Seata常见问题之报找不到全局事务可能已经完成如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
1195 0
|
1月前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。
|
17天前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
124 7
|
29天前
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
50 6
|
29天前
|
数据库
如何在Seata框架中配置分布式事务的隔离级别?
总的来说,配置分布式事务的隔离级别是实现分布式事务管理的重要环节之一,需要认真对待和仔细调整,以满足业务的需求和性能要求。你还可以进一步深入研究和实践 Seata 框架的配置和使用,以更好地应对各种分布式事务场景的挑战。
29 6