Fescar undoExecutor介绍

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 开篇  这篇文章的目的主要是讲解RM的执行回滚的Executor对象即undoExecutor,执行回滚日志就是由undoExecutor去执行的。 undoExecutor源码分析 public class UndoExecutorFactory { public static Ab.

开篇

 这篇文章的目的主要是讲解RM的执行回滚的Executor对象即undoExecutor,执行回滚日志就是由undoExecutor去执行的。


undoExecutor源码分析

public class UndoExecutorFactory {

    public static AbstractUndoExecutor getUndoExecutor(
         String dbType, SQLUndoLog sqlUndoLog) {
        if (!dbType.equals(JdbcConstants.MYSQL)) {
            throw new NotSupportYetException(dbType);
        }
        switch (sqlUndoLog.getSqlType()) {
            case INSERT:
                return new MySQLUndoInsertExecutor(sqlUndoLog);
            case UPDATE:
                return new MySQLUndoUpdateExecutor(sqlUndoLog);
            case DELETE:
                return new MySQLUndoDeleteExecutor(sqlUndoLog);
            default:
                throw new ShouldNeverHappenException();
        }
    }
}

说明:

  • UndoExecutorFactory负责根据不同的回滚日志返回对应的undoExecutor对象。



UndoExecutor类依赖图

说明:

  • AbstractUndoExecutor作为回滚类的抽象基类。
  • MySQLUndoDeleteExecutor负责回滚delete操作。
  • MySQLUndoInsertExecutor负责回滚insert操作。
  • MySQLUndoUpdateExecutor负责回滚update操作。


AbstractUndoExecutor

public abstract class AbstractUndoExecutor {

    protected SQLUndoLog sqlUndoLog;

    protected abstract String buildUndoSQL();

    public AbstractUndoExecutor(SQLUndoLog sqlUndoLog) {
        this.sqlUndoLog = sqlUndoLog;
    }

    public void executeOn(Connection conn) throws SQLException {
        dataValidation(conn);

        try {
            // 拼接undoSql的模板
            String undoSQL = buildUndoSQL();
            // 获取PreparedStatement对象
            PreparedStatement undoPST = conn.prepareStatement(undoSQL);
            // 获取回滚的记录
            TableRecords undoRows = getUndoRows();
            // 遍历所有待回滚的记录然后一条条的拼接字段
            for (Row undoRow : undoRows.getRows()) {
                ArrayList<Field> undoValues = new ArrayList<>();
                Field pkValue = null;
                for (Field field : undoRow.getFields()) {
                    if (field.getKeyType() == KeyType.PrimaryKey) {
                        pkValue = field;
                    } else {
                        undoValues.add(field);
                    }
                }
                // 针对每一条回滚记录进行准备
                undoPrepare(undoPST, undoValues, pkValue);
               // 执行回滚操作
                undoPST.executeUpdate();
            }

        } catch (Exception ex) {
            if (ex instanceof SQLException) {
                throw (SQLException) ex;
            } else {
                throw new SQLException(ex);
            }
        }

    }

    protected void undoPrepare(PreparedStatement undoPST, 
              ArrayList<Field> undoValues, 
              Field pkValue) throws SQLException {
        int undoIndex = 0;
        for (Field undoValue : undoValues) {
            undoIndex++;
            undoPST.setObject(undoIndex, undoValue.getValue(), undoValue.getType());
        }
        // PK is at last one.
        // INSERT INTO a (x, y, z, pk) VALUES (?, ?, ?, ?)
        // UPDATE a SET x=?, y=?, z=? WHERE pk = ?
        // DELETE FROM a WHERE pk = ?
        undoIndex++;
        undoPST.setObject(undoIndex, pkValue.getValue(), pkValue.getType());
    }

    protected abstract TableRecords getUndoRows();

    protected void dataValidation(Connection conn) throws SQLException {
        // Validate if data is dirty.
    }
}

说明:

  • AbstractUndoExecutor定义了回滚操作的整个命令行模板流程。
  • 拼接undoSql的模板,buildUndoSQL()。
  • 获取PreparedStatement对象,conn.prepareStatement(undoSQL)。
  • 遍历所有待回滚的记录然后一条条的拼接字段。
  • 针对每一条回滚记录进行准备,undoPrepare(undoPST, undoValues, pkValue)。
  • 执行回滚操作,undoPST.executeUpdate()。
  • buildUndoSQL()和getUndoRows()由子类具体实现。


MySQLUndoInsertExecutor

public class MySQLUndoInsertExecutor extends AbstractUndoExecutor {

    @Override
    protected String buildUndoSQL() {
        TableRecords afterImage = sqlUndoLog.getAfterImage();
        List<Row> afterImageRows = afterImage.getRows();
        if (afterImageRows == null || afterImageRows.size() == 0) {
            throw new ShouldNeverHappenException("Invalid UNDO LOG");
        }
        Row row = afterImageRows.get(0);
        StringBuffer mainSQL = new StringBuffer(
        "DELETE FROM " + sqlUndoLog.getTableName());
        StringBuffer where = new StringBuffer(" WHERE ");
        boolean first = true;
        for (Field field : row.getFields()) {
            if (field.getKeyType() == KeyType.PrimaryKey) {
                where.append(field.getName() + " = ? ");
            }

        }
        return mainSQL.append(where).toString();
    }

    @Override
    protected void undoPrepare(PreparedStatement undoPST, 
           ArrayList<Field> undoValues, Field pkValue) 
        throws SQLException {
        undoPST.setObject(1, pkValue.getValue(), pkValue.getType());
    }

    public MySQLUndoInsertExecutor(SQLUndoLog sqlUndoLog) {
        super(sqlUndoLog);
    }

    @Override
    protected TableRecords getUndoRows() {
        return sqlUndoLog.getAfterImage();
    }
}

说明:

  • Insert的回滚操作在于逆向进行delete操作,MySQLUndoInsertExecutor负责拼接delete的SQL。
  • delete的SQL的where条件就是insert生成的主键primary key。
  • 整个回滚操作在父类AbstractUndoExecutor定义。


MySQLUndoDeleteExecutor

public class MySQLUndoDeleteExecutor extends AbstractUndoExecutor {

    public MySQLUndoDeleteExecutor(SQLUndoLog sqlUndoLog) {
        super(sqlUndoLog);
    }

    @Override
    protected String buildUndoSQL() {
        TableRecords beforeImage = sqlUndoLog.getBeforeImage();
        List<Row> beforeImageRows = beforeImage.getRows();
        if (beforeImageRows == null || beforeImageRows.size() == 0) {
            throw new ShouldNeverHappenException("Invalid UNDO LOG");
        }
        Row row = beforeImageRows.get(0);

        StringBuffer insertColumns = new StringBuffer();
        StringBuffer insertValues = new StringBuffer();
        Field pkField = null;
        boolean first = true;
        for (Field field : row.getFields()) {
            if (field.getKeyType() == KeyType.PrimaryKey) {
                pkField = field;
                continue;
            } else {
                if (first) {
                    first = false;
                } else {
                    insertColumns.append(", ");
                    insertValues.append(", ");
                }
                insertColumns.append(field.getName());
                insertValues.append("?");
            }

        }
        if (first) {
            first = false;
        } else {
            insertColumns.append(", ");
            insertValues.append(", ");
        }
        insertColumns.append(pkField.getName());
        insertValues.append("?");

        return "INSERT INTO " + sqlUndoLog.getTableName() 
        + "(" + insertColumns.toString() + ") 
        VALUES (" + insertValues.toString() + ")";
    }

    @Override
    protected TableRecords getUndoRows() {
        return sqlUndoLog.getBeforeImage();
    }
}

说明:

  • Delete的回滚操作在于逆向进行Insert操作,MySQLUndoDeleteExecutor负责拼接Insert的SQL。
  • Insert的拼接的SQL是insert tableName (column1,column2) values (?,?).
  • 整个回滚操作在父类AbstractUndoExecutor定义。


MySQLUndoUpdateExecutor

public class MySQLUndoUpdateExecutor extends AbstractUndoExecutor {

    @Override
    protected String buildUndoSQL() {
        TableRecords beforeImage = sqlUndoLog.getBeforeImage();
        List<Row> beforeImageRows = beforeImage.getRows();
        if (beforeImageRows == null || beforeImageRows.size() == 0) {
            throw new ShouldNeverHappenException("Invalid UNDO LOG"); // TODO
        }
        Row row = beforeImageRows.get(0);
        StringBuffer mainSQL = new StringBuffer(
          "UPDATE " + sqlUndoLog.getTableName() + " SET ");
        StringBuffer where = new StringBuffer(" WHERE ");
        boolean first = true;
        for (Field field : row.getFields()) {
            if (field.getKeyType() == KeyType.PrimaryKey) {
                where.append(field.getName() + " = ?");
            } else {
                if (first) {
                    first = false;
                } else {
                    mainSQL.append(", ");
                }
                mainSQL.append(field.getName() + " = ?");
            }

        }
        return mainSQL.append(where).toString();
    }

    public MySQLUndoUpdateExecutor(SQLUndoLog sqlUndoLog) {
        super(sqlUndoLog);
    }

    @Override
    protected TableRecords getUndoRows() {
        return sqlUndoLog.getBeforeImage();
    }
}

说明:

  • update的回滚操作在于逆向进行update操作,MySQLUndoUpdateExecutor负责拼接update的SQL。
  • Insert的拼接的SQL是update tableName set column1=? where column=?。
  • 整个回滚操作在父类AbstractUndoExecutor定义。


Fescar源码分析连载

Fescar 源码解析系列

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
7月前
|
Nacos 数据库
分布式事务解决方案Seata
分布式事务解决方案Seata
99 1
|
存储 SpringCloudAlibaba Java
Seata分布式事务实战 1
Seata分布式事务实战
128 0
|
存储 NoSQL Nacos
Seata分布式事务实战 2
Seata分布式事务实战
157 0
|
SQL Java 数据库
Seata分布式事务源码分析
Seata分布式事务源码分析
95 0
|
NoSQL Java Redis
【Seata】分布式事务框架Seata踩坑集锦
【Seata】分布式事务框架Seata踩坑集锦
【Seata】分布式事务框架Seata踩坑集锦
|
7月前
|
SQL 关系型数据库 FESCAR
分布式事务框架 seata
分布式事务框架 seata
162 0
|
数据库 Nacos
seata实现分布式事务
seata实现分布式事务
80 0
|
Dubbo 关系型数据库 应用服务中间件
快速玩转Dubbo生态之一致性事务篇(Seata)
本场景将对Dubbo接入一致性事务的方式进行介绍,带您体验快速上手Dubbo接入Seata生态。
|
Java 中间件 uml
阿里中间件seata源码剖析三:聊聊seata中的ShutdownHook
阿里中间件seata源码剖析三:聊聊seata中的ShutdownHook
275 7
阿里中间件seata源码剖析三:聊聊seata中的ShutdownHook
|
SQL Java API
分布式事务解决框架seata介绍
分布式事务解决框架seata介绍