Fescar - RM BaseTransactionalExecutor介绍

本文涉及的产品
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
RDS Agent Manager,2核4GB
RDS Agent(兼容OpenClaw),2核4GB
简介: 开篇 这篇文章的目的是讲解RM Executor模块当中一些通用的方法,这些方法在各个Executor的父类当中实现的,各个子类Executor模块都会复用,因此抽取出来统一的进行讲解。 个人是认为抽取通用的内容放在一篇文章讲解完后可以针对每类Executor讲解特有的功能,这样能够有更好的理解。

开篇

 这篇文章的目的是讲解RM Executor模块当中一些通用的方法,这些方法在各个Executor的父类当中实现的,各个子类Executor模块都会复用,因此抽取出来统一的进行讲解。

 个人是认为抽取通用的内容放在一篇文章讲解完后可以针对每类Executor讲解特有的功能,这样能够有更好的理解。这篇文章讲解Executor的父类BaseTransactionalExecutor。


类依赖图


说明:

  • 着重讲解BaseTransactionalExecutor抽象父类。


BaseTransactionalExecutor通用方法介绍

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {
    protected StatementProxy<S> statementProxy;
    protected StatementCallback<T, S> statementCallback;
    protected SQLRecognizer sqlRecognizer;
    private TableMeta tableMeta;

    public BaseTransactionalExecutor(StatementProxy<S> statementProxy, 
        StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {
        this.statementProxy = statementProxy;
        this.statementCallback = statementCallback;
        this.sqlRecognizer = sqlRecognizer;
    }
}

说明:

  • BaseTransactionalExecutor的核心变量SQLRecognizer sqlRecognizer。
  • SQLRecognizer sqlRecognizer的是通过构造函数进行赋值的。
  • SQLRecognizer是由SQLVisitorFactory的工厂方法返回生成的。


public class SQLVisitorFactory {

    public static SQLRecognizer get(String sql, String dbType) {
        List<SQLStatement> asts = SQLUtils.parseStatements(sql, dbType);
        if (asts == null || asts.size() != 1) {
            throw new UnsupportedOperationException("Unsupported SQL: " + sql);
        }
        SQLRecognizer recognizer = null;
        SQLStatement ast = asts.get(0);
        if (JdbcConstants.MYSQL.equalsIgnoreCase(dbType)) {
            if (ast instanceof SQLInsertStatement) {
                recognizer = new MySQLInsertRecognizer(sql, ast);
            } else if (ast instanceof SQLUpdateStatement) {
                recognizer = new MySQLUpdateRecognizer(sql, ast);
            } else if (ast instanceof SQLDeleteStatement) {
                recognizer = new MySQLDeleteRecognizer(sql, ast);
            } else if (ast instanceof SQLSelectStatement) {
                if (((SQLSelectStatement)ast).getSelect().getQueryBlock().isForUpdate()) {
                    recognizer = new MySQLSelectForUpdateRecognizer(sql, ast);
                }
            }
        } else {
            throw new UnsupportedOperationException("Just support MySQL by now!");
        }
        return recognizer;
    }
}

说明:

  • SQLRecognizer 根据不同的SQL类型生成的。
  • MySQLInsertRecognizer MySQLUpdateRecognizer MySQLDeleteRecognizer MySQLSelectForUpdateRecognizer。
  • SQLUtils.parseStatements作为druid开源包提供的功能负责返回SQLStatement对象
  • SQLStatement是根据不同SQL生产的SQL会话对象包含SQL含有的一些通用信息


public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {

    protected String buildWhereConditionByPKs(List<Field> pkRows) throws SQLException {
        StringBuffer whereConditionAppender = new StringBuffer();
        for (int i = 0; i < pkRows.size(); i++) {
            Field field = pkRows.get(i);
            whereConditionAppender.append(getColumnNameInSQL(field.getName()) + " = ?");
            if (i < (pkRows.size() - 1)) {
                whereConditionAppender.append(" OR ");
            }
        }
        return whereConditionAppender.toString();
    }
}

说明:

  • 根据primary key生成where条件语句。
  • 根据主键的个数构建 select x from table where c1=? OR c2=?格式。


public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {

    protected String getColumnNameInSQL(String columnName) {
        String tableAlias = sqlRecognizer.getTableAlias();
        if (tableAlias == null) {
            return columnName;
        } else {
            return tableAlias + "." + columnName;
        }
    }
}

说明:

  • 获取SQL当中的列名,通过sqlRecognizer获取别名。
  • 如果有别名那么列名就是 tableAlias.columnName,否则就是columnName。


public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {

    protected String getFromTableInSQL() {
        String tableName = sqlRecognizer.getTableName();
        String tableAlias = sqlRecognizer.getTableAlias();
        if (tableAlias == null) {
            return tableName;
        } else {
            return tableName + " " + tableAlias;
        }
    }
}

说明:

  • 获取SQL当中的表名,通过sqlRecognizer获取别名。
  • 如果有别名那么列名就是 tableName tableAlias,否则就是tableName。
  • 类似select * from tableName tableAlias。


public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {

    protected TableMeta getTableMeta() {
        return getTableMeta(sqlRecognizer.getTableName());
    }

    protected TableMeta getTableMeta(String tableName) {
        if (tableMeta != null) {
            return tableMeta;
        }
        tableMeta = TableMetaCache.getTableMeta(statementProxy.getConnectionProxy().getDataSourceProxy(), tableName);
        return tableMeta;
    }
}

说明:

  • 获取Table的Meta数据,通过TableMetaCache.getTableMeta()操作实现。
  • TableMetaCache.getTableMeta()是实现Cache功能的元数据获取功能。


public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {

    protected String buildLockKey(TableRecords rowsIncludingPK) {
        if (rowsIncludingPK.size() == 0) {
            return null;
        }
        StringBuilder sb = new StringBuilder();
        sb.append(rowsIncludingPK.getTableMeta().getTableName());
        sb.append(":");

        boolean flag = false;
        for (Field field : rowsIncludingPK.pkRows()) {
            if (flag) {
                sb.append(",");
            } else {
                flag = true;
            }
            sb.append(field.getValue());
        }
        return sb.toString();
    }
}

说明:

  • buildLockKey实现锁定key的生成逻辑,表名+主键列名的拼接。
  • buildLockKey的实现逻辑:tableName:pkName1,pkName2.


public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {

    protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
        if (beforeImage.getRows().size() == 0 && afterImage.getRows().size() == 0) {
            return;
        }

        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();

        TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
        String lockKeys = buildLockKey(lockKeyRecords);
        connectionProxy.appendLockKey(lockKeys);

        SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
        connectionProxy.appendUndoLog(sqlUndoLog);
    }


    protected SQLUndoLog buildUndoItem(TableRecords beforeImage, TableRecords afterImage) {
        SQLType sqlType = sqlRecognizer.getSQLType();
        String tableName = sqlRecognizer.getTableName();

        SQLUndoLog sqlUndoLog = new SQLUndoLog();
        sqlUndoLog.setSqlType(sqlType);
        sqlUndoLog.setTableName(tableName);
        sqlUndoLog.setBeforeImage(beforeImage);
        sqlUndoLog.setAfterImage(afterImage);
        return sqlUndoLog;
    }
}

说明:

  • prepareUndoLog负责生产待回滚记录的日志,按照执行前镜像和执行后镜像对比生成。
  • 回滚日志的存储对象是类SQLUndoLog。


public class TableMetaCache {

    public static TableMeta getTableMeta(DataSourceProxy dataSourceProxy, String tableName) {
        return getTableMeta(dataSourceProxy.getTargetDataSource(), tableName);
    }

    public static TableMeta getTableMeta(final DruidDataSource druidDataSource, final String tableName) {

        String dataSourceKey = druidDataSource.getUrl();

        TableMeta tmeta = null;
        final String key = dataSourceKey + "." + tableName;
        try {
            tmeta = TABLE_META_CACHE.get(key, new Callable<TableMeta>() {
                @Override
                public TableMeta call() throws Exception {
                    return fetchSchema(druidDataSource, tableName);
                }
            });
        } catch (ExecutionException e) {
        }

        if (tmeta == null) {
            try {
                tmeta = fetchSchema(druidDataSource, tableName);
            } catch (SQLException e) {
            }
        }
        return tmeta;
    }


    private static TableMeta fetchSchema(DruidDataSource druidDataSource, String tableName) throws SQLException {
        return fetchSchemeInDefaultWay(druidDataSource, tableName);
    }


    private static TableMeta fetchSchemeInDefaultWay(DruidDataSource druidDataSource, String tableName)
        throws SQLException {
        Connection conn = null;
        java.sql.Statement stmt = null;
        java.sql.ResultSet rs = null;
        try {
            conn = druidDataSource.getConnection();
            stmt = conn.createStatement();
            StringBuffer sb = new StringBuffer("SELECT * FROM " + tableName + " LIMIT 1");
            rs = stmt.executeQuery(sb.toString());
            ResultSetMetaData rsmd = rs.getMetaData();
            DatabaseMetaData dbmd = conn.getMetaData();

            return resultSetMetaToSchema(rsmd, dbmd, tableName);
        } catch (Exception e) {
            if (e instanceof SQLException) {
                throw ((SQLException)e);
            }
            throw new SQLException("Failed to fetch schema of " + tableName, e);

        } finally {
        }
    }

    private static TableMeta resultSetMetaToSchema(ResultSetMetaData rsmd, DatabaseMetaData dbmd, String tableName)
        throws SQLException {
        String schemaName = rsmd.getSchemaName(1);
        String catalogName = rsmd.getCatalogName(1);

        TableMeta tm = new TableMeta();
        tm.setTableName(tableName);

        java.sql.ResultSet rs1 = dbmd.getColumns(catalogName, schemaName, tableName, "%");
        while (rs1.next()) {
            ColumnMeta col = new ColumnMeta();
            col.setTableCat(rs1.getString("TABLE_CAT"));
            col.setTableSchemaName(rs1.getString("TABLE_SCHEM"));
            col.setTableName(rs1.getString("TABLE_NAME"));
            col.setColumnName(rs1.getString("COLUMN_NAME"));
            col.setDataType(rs1.getInt("DATA_TYPE"));
            col.setDataTypeName(rs1.getString("TYPE_NAME"));
            col.setColumnSize(rs1.getInt("COLUMN_SIZE"));
            col.setDecimalDigits(rs1.getInt("DECIMAL_DIGITS"));
            col.setNumPrecRadix(rs1.getInt("NUM_PREC_RADIX"));
            col.setNullAble(rs1.getInt("NULLABLE"));
            col.setRemarks(rs1.getString("REMARKS"));
            col.setColumnDef(rs1.getString("COLUMN_DEF"));
            col.setSqlDataType(rs1.getInt("SQL_DATA_TYPE"));
            col.setSqlDatetimeSub(rs1.getInt("SQL_DATETIME_SUB"));
            col.setCharOctetLength(rs1.getInt("CHAR_OCTET_LENGTH"));
            col.setOrdinalPosition(rs1.getInt("ORDINAL_POSITION"));
            col.setIsNullAble(rs1.getString("IS_NULLABLE"));
            col.setIsAutoincrement(rs1.getString("IS_AUTOINCREMENT"));

            tm.getAllColumns().put(col.getColumnName(), col);
        }

        java.sql.ResultSet rs2 = dbmd.getIndexInfo(catalogName, schemaName, tableName, false, true);
        String indexName = "";
        while (rs2.next()) {
            indexName = rs2.getString("INDEX_NAME");
            String colName = rs2.getString("COLUMN_NAME");
            ColumnMeta col = tm.getAllColumns().get(colName);

            if (tm.getAllIndexes().containsKey(indexName)) {
                IndexMeta index = tm.getAllIndexes().get(indexName);
                index.getValues().add(col);
            } else {
                IndexMeta index = new IndexMeta();
                index.setIndexName(indexName);
                index.setNonUnique(rs2.getBoolean("NON_UNIQUE"));
                index.setIndexQualifier(rs2.getString("INDEX_QUALIFIER"));
                index.setIndexName(rs2.getString("INDEX_NAME"));
                index.setType(rs2.getShort("TYPE"));
                index.setOrdinalPosition(rs2.getShort("ORDINAL_POSITION"));
                index.setAscOrDesc(rs2.getString("ASC_OR_DESC"));
                index.setCardinality(rs2.getInt("CARDINALITY"));
                index.getValues().add(col);
                if ("PRIMARY".equalsIgnoreCase(indexName) || indexName.equalsIgnoreCase(
                    rsmd.getTableName(1) + "_pkey")) {
                    index.setIndextype(IndexType.PRIMARY);
                } else if (index.isNonUnique() == false) {
                    index.setIndextype(IndexType.Unique);
                } else {
                    index.setIndextype(IndexType.Normal);
                }
                tm.getAllIndexes().put(indexName, index);

            }
        }
        IndexMeta index = tm.getAllIndexes().get(indexName);
        if (index.getIndextype().value() != 0) {
            if ("H2 JDBC Driver".equals(dbmd.getDriverName())) {
                if (indexName.length() > 11 && "PRIMARY_KEY".equalsIgnoreCase(indexName.substring(0, 11))) {
                    index.setIndextype(IndexType.PRIMARY);
                }
            } else if (dbmd.getDriverName() != null && dbmd.getDriverName().toLowerCase().indexOf("postgresql") >= 0) {
                if ((tableName + "_pkey").equalsIgnoreCase(indexName)) {
                    index.setIndextype(IndexType.PRIMARY);
                }
            }
        }
        return tm;
    }
}

说明:

  • TableMetaCache提供获取表数据的功能,包括缓存功能。
  • 元数据功能包括数据库本身元数据和返回结果元数据。
  • 通过DataSource->Connection->Statement的逻辑执行SQL语句获取ResultSetMetaData结果元数据。
  • 通过Connection获取DatabaseMetaData dbmd数据库元数据
相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
目录
相关文章
|
5天前
|
云安全 人工智能 运维
阿里云SecOps Agent,全新安全跨产品执行体验
自然语言驱动 云安全中心/WAF/CFW/ 等多款安全产品联动
1603 2
|
3天前
|
人工智能 定位技术 SEO
我学 GEO 第 15 天:终于知道AI GEO该如何做?
我是暴走的莉莉酱,边旅行边研究AI GEO的数字游民。专注普通人如何提升“AI可见度”——让AI在回答用户问题时准确识别、理解并推荐你。不讲玄学,只做可测、可调、可持续的GEO实践。
368 124
|
5天前
|
机器学习/深度学习 人工智能 调度
🐴 HappyHorse 1.1 现已上线阿里云百炼!快来查收模型使用指南,现在调用享 6 折~
HappyHorse 1.1 是新一代视频生成大模型,全面升级动态表现力、角色一致性、指令遵循、视觉质感与音画协同能力。支持I2V/T2V/R2V三类生成,适配短剧、电商广告、品牌营销等场景,提供高质、流畅、可控的AI视频生产力。
627 4
🐴 HappyHorse 1.1 现已上线阿里云百炼!快来查收模型使用指南,现在调用享 6 折~
|
3天前
|
缓存 人工智能 运维
阿里云618百炼大模型Qwen3.7-Max功能、免费试用、订阅计费、配置接入详解
Qwen3.7-MAX是阿里云百炼平台推出的通义千问3.7系列旗舰大语言模型,专为智能体时代复杂任务打造,依托阿里云全域算力与自研技术,在逻辑推理、长文本处理、代码工程、长周期自主执行等领域达到行业顶尖水平。2026年618期间,该模型推出多重免费试用权益、按量计费5折、订阅套餐优惠等专属福利,覆盖个人开发者、团队与企业全场景需求,以下从核心功能、免费试用、订阅计费、配置接入四方面展开详细解析。
366 123
|
16天前
|
缓存 测试技术 API
Qwen 3.7 Plus 与 Max 实测:性价比与多模态能力差异解析(2026)
2026 年 6 月 1 日,阿里悄无声息地发布了 Qwen 3.7 Plus,距 Qwen 3.7 Max 上线刚好 11 天。同样的 1M 上下文,同样的 35 小时自治上限。但价格才是头条:Plus 是 0.40/M输入,Max是 2.50/M——便宜约 6 倍——并且还能看图、看视频。Vision Arena 上 Plus 已经排到 #16。所以这周真正值得讨论的问题不是”要不要为视觉能力买单”,而是”Max 凭什么用 6 倍价格换来 2 个百分点的 benchmark 领先”。
|
2天前
|
存储 人工智能 数据可视化
别再手动复制 Skill 了:多 Agent 时代的 Skill 管理方案
多 Agent 场景下 Skill 的统一管理与同步。
187 121
|
9天前
|
缓存 人工智能 运维
GLM 5.2自托管全流程实战:硬件选型、vLLM/SGLang部署与成本盈亏测算
2026年智谱发布GLM 5.2超大混合专家模型,区别于以往仅开放API的闭源大模型,该模型权重以MIT开源协议对外发布,企业与开发者可完整下载、本地审计、私有化部署,实现数据不出环境、自定义微调、自主调度推理资源。GLM 5.2拥有753B总参数,原生支持百万级上下文窗口,在代码生成、长文档推理、数学逻辑等多项基准测试中对标国际顶尖商用模型,是首款可完整自托管的前沿代码向大模型。
748 0
|
2天前
|
SQL 存储 运维
日志能不能改?SLS LogStore 原生支持更新和删除了
随着日志承载的业务语义越来越多,数据订正、回填、清理等需求变得越来越常见。SLS 现已为 LogStore 提供原生 update/delete 能力——支持按 RowID 精确修改,按查询条件批量操作,类似计费调账、标签刷新、反馈回填等场景都可以直接在 LogStore 内完成闭环。
170 123
|
17天前
|
JavaScript 定位技术 API
CodeGraph 爆火:编程 Agent 需要的不是更多上下文,而是一张提前画好的代码地图
CodeGraph 是一款爆火的本地代码智能工具,通过 tree-sitter 解析 AST 构建结构化知识图谱(存于 SQLite),为编程 Agent 提前生成“代码地图”。它显著降低 Agent 在中大型项目中的探索成本——实测工具调用减少71%、Token 降57%、速度提升46%,支持19+语言及主流框架路由识别,完全离线、无需 API Key。
941 12
CodeGraph 爆火:编程 Agent 需要的不是更多上下文,而是一张提前画好的代码地图

热门文章

最新文章