Fescar undoExecutor介绍

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 开篇  这篇文章的目的主要是讲解RM的执行回滚的Executor对象即undoExecutor,执行回滚日志就是由undoExecutor去执行的。 undoExecutor源码分析 public class UndoExecutorFactory { public static Ab.

开篇

 这篇文章的目的主要是讲解RM的执行回滚的Executor对象即undoExecutor,执行回滚日志就是由undoExecutor去执行的。


undoExecutor源码分析

public class UndoExecutorFactory {

    public static AbstractUndoExecutor getUndoExecutor(
         String dbType, SQLUndoLog sqlUndoLog) {
        if (!dbType.equals(JdbcConstants.MYSQL)) {
            throw new NotSupportYetException(dbType);
        }
        switch (sqlUndoLog.getSqlType()) {
            case INSERT:
                return new MySQLUndoInsertExecutor(sqlUndoLog);
            case UPDATE:
                return new MySQLUndoUpdateExecutor(sqlUndoLog);
            case DELETE:
                return new MySQLUndoDeleteExecutor(sqlUndoLog);
            default:
                throw new ShouldNeverHappenException();
        }
    }
}

说明:

  • UndoExecutorFactory负责根据不同的回滚日志返回对应的undoExecutor对象。



UndoExecutor类依赖图

说明:

  • AbstractUndoExecutor作为回滚类的抽象基类。
  • MySQLUndoDeleteExecutor负责回滚delete操作。
  • MySQLUndoInsertExecutor负责回滚insert操作。
  • MySQLUndoUpdateExecutor负责回滚update操作。


AbstractUndoExecutor

public abstract class AbstractUndoExecutor {

    protected SQLUndoLog sqlUndoLog;

    protected abstract String buildUndoSQL();

    public AbstractUndoExecutor(SQLUndoLog sqlUndoLog) {
        this.sqlUndoLog = sqlUndoLog;
    }

    public void executeOn(Connection conn) throws SQLException {
        dataValidation(conn);

        try {
            // 拼接undoSql的模板
            String undoSQL = buildUndoSQL();
            // 获取PreparedStatement对象
            PreparedStatement undoPST = conn.prepareStatement(undoSQL);
            // 获取回滚的记录
            TableRecords undoRows = getUndoRows();
            // 遍历所有待回滚的记录然后一条条的拼接字段
            for (Row undoRow : undoRows.getRows()) {
                ArrayList<Field> undoValues = new ArrayList<>();
                Field pkValue = null;
                for (Field field : undoRow.getFields()) {
                    if (field.getKeyType() == KeyType.PrimaryKey) {
                        pkValue = field;
                    } else {
                        undoValues.add(field);
                    }
                }
                // 针对每一条回滚记录进行准备
                undoPrepare(undoPST, undoValues, pkValue);
               // 执行回滚操作
                undoPST.executeUpdate();
            }

        } catch (Exception ex) {
            if (ex instanceof SQLException) {
                throw (SQLException) ex;
            } else {
                throw new SQLException(ex);
            }
        }

    }

    protected void undoPrepare(PreparedStatement undoPST, 
              ArrayList<Field> undoValues, 
              Field pkValue) throws SQLException {
        int undoIndex = 0;
        for (Field undoValue : undoValues) {
            undoIndex++;
            undoPST.setObject(undoIndex, undoValue.getValue(), undoValue.getType());
        }
        // PK is at last one.
        // INSERT INTO a (x, y, z, pk) VALUES (?, ?, ?, ?)
        // UPDATE a SET x=?, y=?, z=? WHERE pk = ?
        // DELETE FROM a WHERE pk = ?
        undoIndex++;
        undoPST.setObject(undoIndex, pkValue.getValue(), pkValue.getType());
    }

    protected abstract TableRecords getUndoRows();

    protected void dataValidation(Connection conn) throws SQLException {
        // Validate if data is dirty.
    }
}

说明:

  • AbstractUndoExecutor定义了回滚操作的整个命令行模板流程。
  • 拼接undoSql的模板,buildUndoSQL()。
  • 获取PreparedStatement对象,conn.prepareStatement(undoSQL)。
  • 遍历所有待回滚的记录然后一条条的拼接字段。
  • 针对每一条回滚记录进行准备,undoPrepare(undoPST, undoValues, pkValue)。
  • 执行回滚操作,undoPST.executeUpdate()。
  • buildUndoSQL()和getUndoRows()由子类具体实现。


MySQLUndoInsertExecutor

public class MySQLUndoInsertExecutor extends AbstractUndoExecutor {

    @Override
    protected String buildUndoSQL() {
        TableRecords afterImage = sqlUndoLog.getAfterImage();
        List<Row> afterImageRows = afterImage.getRows();
        if (afterImageRows == null || afterImageRows.size() == 0) {
            throw new ShouldNeverHappenException("Invalid UNDO LOG");
        }
        Row row = afterImageRows.get(0);
        StringBuffer mainSQL = new StringBuffer(
        "DELETE FROM " + sqlUndoLog.getTableName());
        StringBuffer where = new StringBuffer(" WHERE ");
        boolean first = true;
        for (Field field : row.getFields()) {
            if (field.getKeyType() == KeyType.PrimaryKey) {
                where.append(field.getName() + " = ? ");
            }

        }
        return mainSQL.append(where).toString();
    }

    @Override
    protected void undoPrepare(PreparedStatement undoPST, 
           ArrayList<Field> undoValues, Field pkValue) 
        throws SQLException {
        undoPST.setObject(1, pkValue.getValue(), pkValue.getType());
    }

    public MySQLUndoInsertExecutor(SQLUndoLog sqlUndoLog) {
        super(sqlUndoLog);
    }

    @Override
    protected TableRecords getUndoRows() {
        return sqlUndoLog.getAfterImage();
    }
}

说明:

  • Insert的回滚操作在于逆向进行delete操作,MySQLUndoInsertExecutor负责拼接delete的SQL。
  • delete的SQL的where条件就是insert生成的主键primary key。
  • 整个回滚操作在父类AbstractUndoExecutor定义。


MySQLUndoDeleteExecutor

public class MySQLUndoDeleteExecutor extends AbstractUndoExecutor {

    public MySQLUndoDeleteExecutor(SQLUndoLog sqlUndoLog) {
        super(sqlUndoLog);
    }

    @Override
    protected String buildUndoSQL() {
        TableRecords beforeImage = sqlUndoLog.getBeforeImage();
        List<Row> beforeImageRows = beforeImage.getRows();
        if (beforeImageRows == null || beforeImageRows.size() == 0) {
            throw new ShouldNeverHappenException("Invalid UNDO LOG");
        }
        Row row = beforeImageRows.get(0);

        StringBuffer insertColumns = new StringBuffer();
        StringBuffer insertValues = new StringBuffer();
        Field pkField = null;
        boolean first = true;
        for (Field field : row.getFields()) {
            if (field.getKeyType() == KeyType.PrimaryKey) {
                pkField = field;
                continue;
            } else {
                if (first) {
                    first = false;
                } else {
                    insertColumns.append(", ");
                    insertValues.append(", ");
                }
                insertColumns.append(field.getName());
                insertValues.append("?");
            }

        }
        if (first) {
            first = false;
        } else {
            insertColumns.append(", ");
            insertValues.append(", ");
        }
        insertColumns.append(pkField.getName());
        insertValues.append("?");

        return "INSERT INTO " + sqlUndoLog.getTableName() 
        + "(" + insertColumns.toString() + ") 
        VALUES (" + insertValues.toString() + ")";
    }

    @Override
    protected TableRecords getUndoRows() {
        return sqlUndoLog.getBeforeImage();
    }
}

说明:

  • Delete的回滚操作在于逆向进行Insert操作,MySQLUndoDeleteExecutor负责拼接Insert的SQL。
  • Insert的拼接的SQL是insert tableName (column1,column2) values (?,?).
  • 整个回滚操作在父类AbstractUndoExecutor定义。


MySQLUndoUpdateExecutor

public class MySQLUndoUpdateExecutor extends AbstractUndoExecutor {

    @Override
    protected String buildUndoSQL() {
        TableRecords beforeImage = sqlUndoLog.getBeforeImage();
        List<Row> beforeImageRows = beforeImage.getRows();
        if (beforeImageRows == null || beforeImageRows.size() == 0) {
            throw new ShouldNeverHappenException("Invalid UNDO LOG"); // TODO
        }
        Row row = beforeImageRows.get(0);
        StringBuffer mainSQL = new StringBuffer(
          "UPDATE " + sqlUndoLog.getTableName() + " SET ");
        StringBuffer where = new StringBuffer(" WHERE ");
        boolean first = true;
        for (Field field : row.getFields()) {
            if (field.getKeyType() == KeyType.PrimaryKey) {
                where.append(field.getName() + " = ?");
            } else {
                if (first) {
                    first = false;
                } else {
                    mainSQL.append(", ");
                }
                mainSQL.append(field.getName() + " = ?");
            }

        }
        return mainSQL.append(where).toString();
    }

    public MySQLUndoUpdateExecutor(SQLUndoLog sqlUndoLog) {
        super(sqlUndoLog);
    }

    @Override
    protected TableRecords getUndoRows() {
        return sqlUndoLog.getBeforeImage();
    }
}

说明:

  • update的回滚操作在于逆向进行update操作,MySQLUndoUpdateExecutor负责拼接update的SQL。
  • Insert的拼接的SQL是update tableName set column1=? where column=?。
  • 整个回滚操作在父类AbstractUndoExecutor定义。


Fescar源码分析连载

Fescar 源码解析系列

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
8月前
|
存储 关系型数据库 Java
技术经验解读:三种分布式事务LCN、Seata、MQ
技术经验解读:三种分布式事务LCN、Seata、MQ
270 0
|
Dubbo 关系型数据库 应用服务中间件
快速玩转Dubbo生态之一致性事务篇(Seata)
本场景将对Dubbo接入一致性事务的方式进行介绍,带您体验快速上手Dubbo接入Seata生态。
|
中间件 uml
阿里中间件seata源码剖析六:TCC模式中2阶段提交实现
阿里中间件seata源码剖析六:TCC模式中2阶段提交实现
409 14
阿里中间件seata源码剖析六:TCC模式中2阶段提交实现
|
算法 中间件 Java
阿里中间件seata源码剖析五:聊聊seata中全局事务的开启
阿里中间件seata源码剖析五:聊聊seata中全局事务的开启
345 10
阿里中间件seata源码剖析五:聊聊seata中全局事务的开启
分布式事务,阿里为什么钟爱TCC
分布式事务,阿里为什么钟爱TCC
565 1
分布式事务,阿里为什么钟爱TCC
|
SQL NoSQL Java
还不会分布式事务,seata xa模式入门实战送上
还不会分布式事务,seata xa模式入门实战送上
799 0
还不会分布式事务,seata xa模式入门实战送上
|
SQL JSON 数据库
分布式事务(Seata) 四大模式详解
今天就来讲解关于Seata中分布式四种模型的介绍
856 0
分布式事务(Seata) 四大模式详解
|
SQL 中间件 FESCAR
分布式事务中间件 Fescar—RM 模块源码解读
前言 在SOA、微服务架构流行的年代,许多复杂业务上需要支持多资源占用场景,而在分布式系统中因为某个资源不足而导致其它资源占用回滚的系统设计一直是个难点。我所在的团队也遇到了这个问题,为解决这个问题上,团队采用的是阿里开源的分布式中间件Fescar的解决方案,并详细了解了Fescar内部的工作原理,解决在使用Fescar中间件过程中的一些疑虑的地方,也为后续团队在继续使用该中间件奠定理论基础。
27674 79
|
JSON Cloud Native 算法
深度剖析Saga分布式事务
saga是分布式事务领域非常重要的事务模式,特别适合解决旅游订票等长期事务。本文将深入分析saga事务的设计原理和解决订票问题的最佳实践。
972 0
深度剖析Saga分布式事务
|
SQL 中间件 FESCAR
分布式事务中间件Fescar—全局写排它锁解读
一般,数据库事务的隔离级别会被设置成 ***读已提交***,已满足业务需求,这样对应在Fescar中的分支(本地)事务的隔离级别就是 读已提交,那么Fescar中对于全局事务的隔离级别又是什么呢?如果认真阅读了 分布式事务中间件Fescar-RM模块源码解读 的同学...
2348 11