Fescar - RM SelectForUpdateExecutor介绍

简介: 开篇 这篇文章的目的是讲解RM Executor模块当中一些通用的方法,这些方法在各个Executor的父类当中实现的,各个子类Executor模块都会复用,因此抽取出来统一的进行讲解。 个人是认为抽取通用的内容放在一篇文章讲解完后可以针对每类Executor讲解特有的功能,这样能够有更好的理解。

开篇

 这篇文章的目的是讲解RM Executor模块当中一些通用的方法,这些方法在各个Executor的父类当中实现的,各个子类Executor模块都会复用,因此抽取出来统一的进行讲解。

 个人是认为抽取通用的内容放在一篇文章讲解完后可以针对每类Executor讲解特有的功能,这样能够有更好的理解。这篇文章讲解Executor的实现类SelectForUpdateExecutor。


类依赖图


说明:

  • 着重讲解SelectForUpdateExecutor实现类。
  • SelectForUpdateExecutor的实现和其他的Executor从类图实现上不同,因为select不需要保存镜像。

SelectForUpdateExecutor方法介绍

public class SelectForUpdateExecutor<S extends Statement> extends BaseTransactionalExecutor<ResultSet, S> {

    public SelectForUpdateExecutor(StatementProxy<S> statementProxy, StatementCallback<ResultSet, S> statementCallback, SQLRecognizer sqlRecognizer) {
        super(statementProxy, statementCallback, sqlRecognizer);
    }

    @Override
    public Object doExecute(Object... args) throws Throwable {
        SQLSelectRecognizer recognizer = (SQLSelectRecognizer) sqlRecognizer;

        Connection conn = statementProxy.getConnection();
        ResultSet rs = null;
        Savepoint sp = null;
        LockRetryController lockRetryController = new LockRetryController();
        boolean originalAutoCommit = conn.getAutoCommit();

        StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
        selectSQLAppender.append(getColumnNameInSQL(getTableMeta().getPkName()));
        selectSQLAppender.append(" FROM " + getFromTableInSQL());
        String whereCondition = null;
        ArrayList<Object> paramAppender = new ArrayList<>();
        if (statementProxy instanceof ParametersHolder) {
            whereCondition = recognizer.getWhereCondition((ParametersHolder) statementProxy, paramAppender);
        } else {
            whereCondition = recognizer.getWhereCondition();
        }
        if (!StringUtils.isEmpty(whereCondition)) {
            selectSQLAppender.append(" WHERE " + whereCondition);
        }
        selectSQLAppender.append(" FOR UPDATE");
        String selectPKSQL = selectSQLAppender.toString();

        try {
            if (originalAutoCommit) {
                conn.setAutoCommit(false);
            }
            sp = conn.setSavepoint();
            rs = statementCallback.execute(statementProxy.getTargetStatement(), args);

            while (true) {
                // Try to get global lock of those rows selected
                Statement stPK = null;
                PreparedStatement pstPK = null;
                ResultSet rsPK = null;
                try {
                    if (paramAppender.isEmpty()) {
                        stPK = statementProxy.getConnection().createStatement();
                        rsPK = stPK.executeQuery(selectPKSQL);
                    } else {
                        pstPK = statementProxy.getConnection().prepareStatement(selectPKSQL);
                        for (int i = 0; i < paramAppender.size(); i++) {
                            pstPK.setObject(i + 1, paramAppender.get(i));
                        }
                        rsPK = pstPK.executeQuery();
                    }

                    TableRecords selectPKRows = TableRecords.buildRecords(getTableMeta(), rsPK);
                    statementProxy.getConnectionProxy().checkLock(selectPKRows);
                    break;

                } catch (LockConflictException lce) {
                    conn.rollback(sp);
                    lockRetryController.sleep(lce);

                } finally {
                    if (rsPK != null) {
                        rsPK.close();
                    }
                    if (stPK != null) {
                        stPK.close();
                    }
                    if (pstPK != null) {
                        pstPK.close();
                    }
                }
            }

        } finally {
            if (sp != null) {
                conn.releaseSavepoint(sp);
            }
            if (originalAutoCommit) {
                conn.setAutoCommit(true);
            }
        }
        return rs;
    }
}

说明:

  • 从类的依赖图可以看出SelectForUpdateExecutor的实现方式和其他Executor不一样。
  • SelectForUpdateExecutor因为涉及到的查询操作,所以没有执行前后镜像的问题。
  • SelectForUpdateExecutor的执行逻辑在于设置回滚点,conn.setSavepoint()。
  • SelectForUpdateExecutor通过拼接查询主键的SQL语句获取查询记录的主键key。
  • SelectForUpdateExecutor根据查询主键的记录判断是否可以锁checkLock,如果不能锁则直接回滚,然后进行重试。
目录
相关文章
|
Java Nacos Docker
Docker安装Seata分布式事务
Docker安装Seata分布式事务
Docker安装Seata分布式事务
|
6月前
|
Java 微服务 Spring
Seata 客户端需要同时启动 TM 和 RM 吗?
Seata 客户端需要同时启动 TM 和 RM 吗?
|
消息中间件 前端开发 Java
阿里中间件seata源码剖析一:聊聊RM和TM客户端初始化
阿里中间件seata源码剖析一:聊聊RM和TM客户端初始化
251 10
阿里中间件seata源码剖析一:聊聊RM和TM客户端初始化
RM在seata AT模式中如何实现分支事务提交或回滚
RM在seata AT模式中如何实现分支事务提交或回滚
461 0
|
SQL 中间件 FESCAR
分布式事务中间件 Fescar—RM 模块源码解读
前言 在SOA、微服务架构流行的年代,许多复杂业务上需要支持多资源占用场景,而在分布式系统中因为某个资源不足而导致其它资源占用回滚的系统设计一直是个难点。我所在的团队也遇到了这个问题,为解决这个问题上,团队采用的是阿里开源的分布式中间件Fescar的解决方案,并详细了解了Fescar内部的工作原理,解决在使用Fescar中间件过程中的一些疑虑的地方,也为后续团队在继续使用该中间件奠定理论基础。
27637 71
|
Java 微服务 Spring
Seata 客户端需要同时启动 RM 和 TM 吗?
在分析启动部分源码时,我发现 GlobalTransactionScanner 会同时启动 RM 和 TM client,但根据 Seata 的设计来看,TM 负责全局事务的操作,如果一个服务中不需要开启全局事务,此时是不需要启动 TM client的,也就是说项目中如果没有全局事务注解,此时是不是就不需要初始化 TM client 了,因为不是每个微服务,都需要 GlobalTransactional,它此时仅仅作为一个 RM client 而已。
147 0
Seata 客户端需要同时启动 RM 和 TM 吗?
|
Shell Nacos 微服务
seata夸服务器服务注册RM失败问题
seata夸服务器服务注册RM失败问题
637 0
|
SQL 中间件 Java
分布式事务中间件Fescar—RM模块源码解读
在SOA、微服务架构流行的年代,许多复杂业务上需要支持多资源占用场景,而在分布式系统中因为某个资源不足而导致其它资源占用回滚的系统设计一直是个难点。我所有团队也遇到了这个问题,为解决这个问题上,团队采用的是阿里开源的分布式中间件Fescar的解决方案,并详细了解了Fescar内部的工作原理,解决在使用Fescar中间件过程中的一些疑虑的地方,也为后续团队在继续使用该中间件奠定理论基础。
2760 2
|
SQL FESCAR 安全
Fescar - RM UpdateExecutor介绍
开篇  这篇文章的目的是讲解RM Executor模块当中一些通用的方法,这些方法在各个Executor的父类当中实现的,各个子类Executor模块都会复用,因此抽取出来统一的进行讲解。  个人是认为抽取通用的内容放在一篇文章讲解完后可以针对每类Executor讲解特有的功能,这样能够有更好的理解。
1120 0
|
SQL FESCAR
Fescar - RM DeleteExecutor介绍
开篇  这篇文章的目的是讲解RM Executor模块当中一些通用的方法,这些方法在各个Executor的父类当中实现的,各个子类Executor模块都会复用,因此抽取出来统一的进行讲解。  个人是认为抽取通用的内容放在一篇文章讲解完后可以针对每类Executor讲解特有的功能,这样能够有更好的理解。
1090 0