Fescar - RM UpdateExecutor介绍

简介: 开篇 这篇文章的目的是讲解RM Executor模块当中一些通用的方法,这些方法在各个Executor的父类当中实现的,各个子类Executor模块都会复用,因此抽取出来统一的进行讲解。 个人是认为抽取通用的内容放在一篇文章讲解完后可以针对每类Executor讲解特有的功能,这样能够有更好的理解。

开篇

 这篇文章的目的是讲解RM Executor模块当中一些通用的方法,这些方法在各个Executor的父类当中实现的,各个子类Executor模块都会复用,因此抽取出来统一的进行讲解。

 个人是认为抽取通用的内容放在一篇文章讲解完后可以针对每类Executor讲解特有的功能,这样能够有更好的理解。这篇文章讲解Executor的实现类UpdateExecutor。


类依赖图


说明:

  • 着重讲解UpdateExecutor实现类。


UpdateExecutor方法介绍

public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {

    public UpdateExecutor(StatementProxy statementProxy, StatementCallback statementCallback, SQLRecognizer sqlRecognizer) {
        super(statementProxy, statementCallback, sqlRecognizer);
    }

    @Override
    protected TableRecords beforeImage() throws SQLException {
        SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer;

        TableMeta tmeta = getTableMeta();
        List<String> updateColumns = recognizer.getUpdateColumns();

        StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
        if (!tmeta.containsPK(updateColumns)) {
            // PK should be included.
            selectSQLAppender.append(getColumnNameInSQL(tmeta.getPkName()) + ", ");
        }
        for (int i = 0; i < updateColumns.size(); i++) {
            selectSQLAppender.append(updateColumns.get(i));
            if (i < (updateColumns.size() - 1)) {
                selectSQLAppender.append(", ");
            }
        }
        String whereCondition = null;
        ArrayList<Object> paramAppender = new ArrayList<>();
        if (statementProxy instanceof ParametersHolder) {
            whereCondition = recognizer.getWhereCondition((ParametersHolder)statementProxy, paramAppender);
        } else {
            whereCondition = recognizer.getWhereCondition();
        }
        selectSQLAppender.append(" FROM " + getFromTableInSQL() + " WHERE " + whereCondition + " FOR UPDATE");
        String selectSQL = selectSQLAppender.toString();

        TableRecords beforeImage = null;
        PreparedStatement ps = null;
        Statement st = null;
        ResultSet rs = null;
        try {
            if (paramAppender.isEmpty()) {
                st = statementProxy.getConnection().createStatement();
                rs = st.executeQuery(selectSQL);
            } else {
                ps = statementProxy.getConnection().prepareStatement(selectSQL);
                for (int i = 0; i< paramAppender.size(); i++) {
                    ps.setObject(i + 1, paramAppender.get(i));
                }
                rs = ps.executeQuery();
            }
            beforeImage = TableRecords.buildRecords(tmeta, rs);

        } finally {
            if (rs != null) {
                rs.close();
            }
            if (st != null) {
                st.close();
            }
            if (ps != null) {
                ps.close();
            }
        }
        return beforeImage;
    }
}

说明:

  • UpdateExecutor需要保存SQL执行前的镜像。
  • 执行前镜像的准备整体思路是按照按照update的更新条件逆向拼接正向查询SQL。
  • 查询字段当中必须包含主键原因是因为afterImage()操作当中需要根据主键进行查询。原因猜测因为update会把where条件中字段的值变更,保存主键最安全。
  • 查询的SQL的select的字段从update字段中解析并加上主键列
  • 查询的SQL的条件通过解析update的条件生成。
  • 根据TableRecords.buildRecords生成执行前镜像。


public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {

    public UpdateExecutor(StatementProxy statementProxy, StatementCallback statementCallback, SQLRecognizer sqlRecognizer) {
        super(statementProxy, statementCallback, sqlRecognizer);
    }


    @Override
    protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
        SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer;

        TableMeta tmeta = getTableMeta();
        if (beforeImage == null || beforeImage.size() == 0) {
            return TableRecords.empty(getTableMeta());
        }
        List<String> updateColumns = recognizer.getUpdateColumns();

        StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
        if (!tmeta.containsPK(updateColumns)) {
            // PK should be included.
            selectSQLAppender.append(getColumnNameInSQL(tmeta.getPkName()) + ", ");
        }
        for (int i = 0; i < updateColumns.size(); i++) {
            selectSQLAppender.append(updateColumns.get(i));
            if (i < (updateColumns.size() - 1)) {
                selectSQLAppender.append(", ");
            }
        }
        List<Field> pkRows = beforeImage.pkRows();
        selectSQLAppender.append(
            " FROM " + getFromTableInSQL() + " WHERE " + buildWhereConditionByPKs(pkRows) + " FOR UPDATE");
        String selectSQL = selectSQLAppender.toString();

        TableRecords afterImage = null;
        PreparedStatement pst = null;
        ResultSet rs = null;
        try {
            pst = statementProxy.getConnection().prepareStatement(selectSQL);
            int index = 0;
            for (Field pkField : pkRows) {
                index++;
                pst.setObject(index, pkField.getValue(), pkField.getType());
            }
            rs = pst.executeQuery();
            afterImage = TableRecords.buildRecords(tmeta, rs);

        } finally {
            if (rs != null) {
                rs.close();
            }
            if (pst != null) {
                pst.close();
            }
        }
        return afterImage;
    }
}

说明:

  • UpdateExecutor需要保存SQL执行后的镜像。
  • UpdateExecutor执行后的镜像通过查询更新的字段字段生成的。
  • 查询更新后镜像的SQL的select的字段从update字段中解析并加上主键列。
  • 查询更新后镜像的SQL的条件是beforeImage()查询得到的主键key。
  • 根据TableRecords.buildRecords生成执行前镜像。


期待

  UpdateExecutor生成执行前后镜像的过程分析完后,接下去会分析生成镜像日志的流程。

目录
相关文章
|
Java Nacos Docker
Docker安装Seata分布式事务
Docker安装Seata分布式事务
Docker安装Seata分布式事务
|
6月前
|
Java 微服务 Spring
Seata 客户端需要同时启动 TM 和 RM 吗?
Seata 客户端需要同时启动 TM 和 RM 吗?
|
消息中间件 前端开发 Java
阿里中间件seata源码剖析一:聊聊RM和TM客户端初始化
阿里中间件seata源码剖析一:聊聊RM和TM客户端初始化
251 10
阿里中间件seata源码剖析一:聊聊RM和TM客户端初始化
RM在seata AT模式中如何实现分支事务提交或回滚
RM在seata AT模式中如何实现分支事务提交或回滚
461 0
|
SQL 中间件 FESCAR
分布式事务中间件 Fescar—RM 模块源码解读
前言 在SOA、微服务架构流行的年代,许多复杂业务上需要支持多资源占用场景,而在分布式系统中因为某个资源不足而导致其它资源占用回滚的系统设计一直是个难点。我所在的团队也遇到了这个问题,为解决这个问题上,团队采用的是阿里开源的分布式中间件Fescar的解决方案,并详细了解了Fescar内部的工作原理,解决在使用Fescar中间件过程中的一些疑虑的地方,也为后续团队在继续使用该中间件奠定理论基础。
27637 71
|
Java 微服务 Spring
Seata 客户端需要同时启动 RM 和 TM 吗?
在分析启动部分源码时,我发现 GlobalTransactionScanner 会同时启动 RM 和 TM client,但根据 Seata 的设计来看,TM 负责全局事务的操作,如果一个服务中不需要开启全局事务,此时是不需要启动 TM client的,也就是说项目中如果没有全局事务注解,此时是不是就不需要初始化 TM client 了,因为不是每个微服务,都需要 GlobalTransactional,它此时仅仅作为一个 RM client 而已。
147 0
Seata 客户端需要同时启动 RM 和 TM 吗?
|
Shell Nacos 微服务
seata夸服务器服务注册RM失败问题
seata夸服务器服务注册RM失败问题
637 0
|
SQL 中间件 Java
分布式事务中间件Fescar—RM模块源码解读
在SOA、微服务架构流行的年代,许多复杂业务上需要支持多资源占用场景,而在分布式系统中因为某个资源不足而导致其它资源占用回滚的系统设计一直是个难点。我所有团队也遇到了这个问题,为解决这个问题上,团队采用的是阿里开源的分布式中间件Fescar的解决方案,并详细了解了Fescar内部的工作原理,解决在使用Fescar中间件过程中的一些疑虑的地方,也为后续团队在继续使用该中间件奠定理论基础。
2760 2
|
FESCAR SQL
Fescar - RM SelectForUpdateExecutor介绍
开篇  这篇文章的目的是讲解RM Executor模块当中一些通用的方法,这些方法在各个Executor的父类当中实现的,各个子类Executor模块都会复用,因此抽取出来统一的进行讲解。  个人是认为抽取通用的内容放在一篇文章讲解完后可以针对每类Executor讲解特有的功能,这样能够有更好的理解。
1180 0
|
SQL FESCAR
Fescar - RM DeleteExecutor介绍
开篇  这篇文章的目的是讲解RM Executor模块当中一些通用的方法,这些方法在各个Executor的父类当中实现的,各个子类Executor模块都会复用,因此抽取出来统一的进行讲解。  个人是认为抽取通用的内容放在一篇文章讲解完后可以针对每类Executor讲解特有的功能,这样能够有更好的理解。
1090 0