看了源码才知道Seata AT是这样获取数据表的元数据的

本文涉及的产品
RDS AI 助手,专业版
RDS Agent(兼容OpenClaw),2核4GB
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
简介: 看了源码才知道Seata AT是这样获取数据表的元数据的

前言

我们都知道Seata AT是基于前后镜像来实现事务的成功回滚的,前后镜像的生成依赖于数据表的元数据,Seata是如何生成前后镜像的可以看这篇博客:你知道Seata AT模式中前后镜像是如何生成的嘛?

起初我以为数据库Driver提供了现成的API给开发人员获取指定数据表的元数据,今天看了源码才知道,并没有想象中那么简单。下面我们就来一起看看到底是怎么一回事儿。

一探究竟

我们直接展开关键性的seata源码,进入DataSourceProxy.init()方法中:

// 是否允许开启定时任务检查更新元数据
if (ENABLE_TABLE_META_CHECKER_ENABLE) {
    // 开启定时任务,默认一分钟更新检查一下
    tableMetaExecutor.scheduleAtFixedRate(() -> {
        // 获取数据库链接
        try (Connection connection = dataSource.getConnection()) {
             // 更新缓存中的数据表元数据
             TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
                        .refresh(connection, DataSourceProxy.this.getResourceId());
        } catch (Exception ignore) {}
   }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
}
复制代码

Seata AT在创建了DataSourceProxy对象后,马上会启动一个定时任务,一分钟检查一次缓存中的元数据。

跟着关键代码,我们可以追踪到AbstractTableMetaCache类,这个抽象类其实就提供了两个方法:

public abstract class AbstractTableMetaCache implements TableMetaCache {
    @Override
    public TableMeta getTableMeta(final Connection connection, final String tableName, String resourceId) {
        // 如果缓存中有对应数据就返回,否则就去查询元数据并放在缓存中。
    }
    @Override
    public void refresh(final Connection connection, String resourceId) {
        // 更新缓存
    }
}
复制代码

最后我们发现获取数据表元数据的代码实现在fetchSchema()方法中,但是这个方法是一个抽象方法,有多个实现:

image.png

我们就挑一个MysqlTableMetaCache来看一下里面是如何实现的。

@Override
    protected TableMeta fetchSchema(Connection connection, String tableName) throws SQLException {
        String sql = "SELECT * FROM " + ColumnUtils.addEscape(tableName, JdbcConstants.MYSQL) + " LIMIT 1";
        try (Statement stmt = connection.createStatement();
            // 执行SQL语句:SELECT * FROM [tableName] LIMIT 1;
            ResultSet rs = stmt.executeQuery(sql)) {
            // 根据执行结果获取元数据
            return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData());
        } catch (SQLException sqlEx) {
            throw sqlEx;
        } catch (Exception e) {
            throw new SQLException(String.format("Failed to fetch schema of %s", tableName), e);
        }
    }
复制代码

根据上面源码,我们发现Seata获取Mysql数据表的元数据竟然是通过SELECT * FROM [tableName] LIMIT 1来的,但是事实并不是我们想象的这么简单,继续深入resultSetMetaToSchema()方法:

private TableMeta resultSetMetaToSchema(ResultSetMetaData rsmd, DatabaseMetaData dbmd)
        throws SQLException {
        //always "" for mysql
        String schemaName = rsmd.getSchemaName(1);
        String catalogName = rsmd.getCatalogName(1);
        /*
         * 通过ResultSetMetaData获取tableName可以避免以下情况
         *
         * select * from account_tbl
         * select * from account_TBL
         * select * from `account_tbl`
         * select * from account.account_tbl
         */
        String tableName = rsmd.getTableName(1);
        TableMeta tm = new TableMeta();
        tm.setTableName(tableName);
        /*
         * here has two different type to get the data
         * make sure the table name was right
         * 1. show full columns from xxx from xxx(normal)
         * 2. select xxx from xxx where catalog_name like ? and table_name like ?(informationSchema=true)
         */
        // 通过dbmd发送查询语句获取指定表中的所有列信息
        try (ResultSet rsColumns = dbmd.getColumns(catalogName, schemaName, tableName, "%");
             // 发送查询语句获取表中索引信息
             ResultSet rsIndex = dbmd.getIndexInfo(catalogName, schemaName, tableName, false, true);
             // 查询更新行中的任何值时自动更新的列的信息
             ResultSet onUpdateColumns = dbmd.getVersionColumns(catalogName, schemaName, tableName)) {
            // 收集列信息
            while (rsColumns.next()) {
                ColumnMeta col = new ColumnMeta();
                col.setTableCat(rsColumns.getString("TABLE_CAT"));
                col.setTableSchemaName(rsColumns.getString("TABLE_SCHEM"));
                col.setTableName(rsColumns.getString("TABLE_NAME"));
                col.setColumnName(rsColumns.getString("COLUMN_NAME"));
                col.setDataType(rsColumns.getInt("DATA_TYPE"));
                col.setDataTypeName(rsColumns.getString("TYPE_NAME"));
                col.setColumnSize(rsColumns.getInt("COLUMN_SIZE"));
                col.setDecimalDigits(rsColumns.getInt("DECIMAL_DIGITS"));
                col.setNumPrecRadix(rsColumns.getInt("NUM_PREC_RADIX"));
                col.setNullAble(rsColumns.getInt("NULLABLE"));
                col.setRemarks(rsColumns.getString("REMARKS"));
                col.setColumnDef(rsColumns.getString("COLUMN_DEF"));
                col.setSqlDataType(rsColumns.getInt("SQL_DATA_TYPE"));
                col.setSqlDatetimeSub(rsColumns.getInt("SQL_DATETIME_SUB"));
                col.setCharOctetLength(rsColumns.getInt("CHAR_OCTET_LENGTH"));
                col.setOrdinalPosition(rsColumns.getInt("ORDINAL_POSITION"));
                col.setIsNullAble(rsColumns.getString("IS_NULLABLE"));
                col.setIsAutoincrement(rsColumns.getString("IS_AUTOINCREMENT"));
                if (tm.getAllColumns().containsKey(col.getColumnName())) {
                    throw new NotSupportYetException("Not support the table has the same column name with different case yet");
                }
                tm.getAllColumns().put(col.getColumnName(), col);
            }
            while (onUpdateColumns.next()) {
                tm.getAllColumns().get(onUpdateColumns.getString("COLUMN_NAME")).setOnUpdate(true);
            }
            // 收集索引信息
            while (rsIndex.next()) {
                String indexName = rsIndex.getString("INDEX_NAME");
                String colName = rsIndex.getString("COLUMN_NAME");
                ColumnMeta col = tm.getAllColumns().get(colName);
                if (tm.getAllIndexes().containsKey(indexName)) {
                    IndexMeta index = tm.getAllIndexes().get(indexName);
                    index.getValues().add(col);
                } else {
                    IndexMeta index = new IndexMeta();
                    index.setIndexName(indexName);
                    index.setNonUnique(rsIndex.getBoolean("NON_UNIQUE"));
                    index.setIndexQualifier(rsIndex.getString("INDEX_QUALIFIER"));
                    index.setIndexName(rsIndex.getString("INDEX_NAME"));
                    index.setType(rsIndex.getShort("TYPE"));
                    index.setOrdinalPosition(rsIndex.getShort("ORDINAL_POSITION"));
                    index.setAscOrDesc(rsIndex.getString("ASC_OR_DESC"));
                    index.setCardinality(rsIndex.getInt("CARDINALITY"));
                    index.getValues().add(col);
                    if ("PRIMARY".equalsIgnoreCase(indexName)) {
                        index.setIndextype(IndexType.PRIMARY);
                    } else if (!index.isNonUnique()) {
                        index.setIndextype(IndexType.UNIQUE);
                    } else {
                        index.setIndextype(IndexType.NORMAL);
                    }
                    tm.getAllIndexes().put(indexName, index);
                }
            }
            if (tm.getAllIndexes().isEmpty()) {
                throw new ShouldNeverHappenException("Could not found any index in the table: " + tableName);
            }
        }
        return tm;
    }
复制代码

可以发现,在mysql的实现中,我们查询一个表的元数据,需要执行四条SQL语句,另外Oracle和Postgresql实现中,也是要执行三条查询语句的。在数据表变动不是很频繁的情况下,seata遵循读多写少用缓存的原则,并通过定时任务的方式来保持拿到的数据表元数据是最新的。

小结

在seata获取数据表元数据的实现中,我们通过阅读源码的方式,大致收获了以下几点:

1.seata AT模式默认会开启定时任务每分钟更新数据表元数据,这是一个配置项,在确认运行时数据表不会变更的情况下,开发人员可以不开启该定时任务关闭。client.rm.tableMetaCheckEnable=false即可关闭该定时任务。

2.seata获取数据表元数据至少需要进行三次以上的查询,这属于一个比较重的操作。为了避免获取元数据影响业务的吞吐量,seata遵循了读多写少用缓存的原则,来尽可能地降低该操作带来的影响。


作者:梦想实现家_Z

链接:https://juejin.cn/post/7164649327913074702

来源:稀土掘金

著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://www.aliyun.com/product/rds/mysql 
相关文章
|
SQL API 数据库
Seata AT模式问题之抛出异常触发回滚如何解决
Seata是一款开源的分布式事务解决方案,旨在提供高效且无缝的分布式事务服务;在集成和使用Seata过程中,开发者可能会遇到不同的异常问题,本合集针对Seata常见异常进行系统整理,为开发者提供详细的问题分析和解决方案,助力高效解决分布式事务中的难题
1213 98
|
监控 数据库
在Seata中一张表使用了联合主键,在事务回滚时报异常,改为单个主键,就没有这个异常,如何解决?
在Seata中一张表使用了联合主键,在事务回滚时报异常,改为单个主键,就没有这个异常,如何解决?
Seata框架在AT模式下是如何保证数据一致性的?
通过以上这些机制的协同作用,Seata 在 AT 模式下能够有效地保证数据的一致性,确保分布式事务的可靠执行。你还可以进一步深入研究 Seata 的具体实现细节,以更好地理解其数据一致性保障的原理。
921 157
|
监控 API 数据库
Seata常见问题之Seata AT的设计不支持使用临时表如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
|
存储 Java Apache
Seata 的 AT 模式
Seata 的 AT 模式
Seata的AT模式执行流程
Seata的AT模式执行流程
256 0
|
SQL Java 关系型数据库
Seata之AT模式
Seata之AT模式
|
SQL 关系型数据库 数据库
31-微服务技术栈(高级):分布式事务Seata的AT模式
在分布式架构系统中,服务不止一个,一个完整的业务链路肯定也不止调用一个服务,此时每个服务都有自己的数据库增删改查,而每一个写操作对应一个本地事务。如果想要确保全部的业务状态一致,也就意味着需要所有的本地事务状态一致,这在我们之前的学习中肯定是不具备的,如何做到跨服务、跨数据源的事务一致性将是本章节的重点学习内容。
627 0
|
SQL JSON 自然语言处理
Seata-go 1.1.0 发布,补齐 AT 模式支持
Seata-go 1.1.0 版本补齐了 AT 模式下对 Multi Delete、Multi Update、Insert on Update 和 Select for Update 的支持。至此 Seata-go 的 AT 模式与 Seata AT 模式全面对齐。
Seata-go 1.1.0 发布,补齐 AT 模式支持
|
Java 数据库 微服务
从0到1 手把手搭建spring cloud alibaba 微服务大型应用框架(五) (mini-cloud) SEATA分布式事务篇(上) 运行原理以及AT模式源码启动版集成
从0到1 手把手搭建spring cloud alibaba 微服务大型应用框架(五) (mini-cloud) SEATA分布式事务篇(上) 运行原理以及AT模式源码启动版集成
从0到1 手把手搭建spring cloud alibaba 微服务大型应用框架(五) (mini-cloud) SEATA分布式事务篇(上) 运行原理以及AT模式源码启动版集成