Seata学习整理二

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 前面说过,seata在做二阶段提交前会生成前镜像、执行sql、生成后镜像。那么首先需要做的是,有数据源进行连接,然后需要对表的元数据信息进行抽取。这样才可以进行前镜像以及后镜像的操作。

前面说过,seata在做二阶段提交前会生成前镜像、执行sql、生成后镜像。那么首先需要做的是,有数据源进行连接,然后需要对表的元数据信息进行抽取。这样才可以进行前镜像以及后镜像的操作。


一、初始化数据源元数据信息

可以看到io.seata.rm.datasource.DataSourceProxy中的构造函数会执行初始化方法

public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
        if (targetDataSource instanceof SeataDataSourceProxy) {
            targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();
        }
        this.targetDataSource = targetDataSource;
        //执行初始化
        init(targetDataSource, resourceGroupId);
    }

执行初始化方法会提取相关信息:

//执行初始化
    private void init(DataSource dataSource, String resourceGroupId) {
        this.resourceGroupId = resourceGroupId;
        //获取相关数据源信息
        try (Connection connection = dataSource.getConnection()) {
            jdbcUrl = connection.getMetaData().getURL();
            dbType = JdbcUtils.getDbType(jdbcUrl);
            if (JdbcConstants.ORACLE.equals(dbType)) {
                userName = connection.getMetaData().getUserName();
            }
        } catch (SQLException e) {
            throw new IllegalStateException("can not init dataSource", e);
        }
        //注册数据源
        DefaultResourceManager.get().registerResource(this);
        if (ENABLE_TABLE_META_CHECKER_ENABLE) {
            tableMetaExcutor.scheduleAtFixedRate(() -> {
                //获取数据远连接
                try (Connection connection = dataSource.getConnection()) {
                    //执行刷新表元数据缓存
                    TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
                        .refresh(connection, DataSourceProxy.this.getResourceId());
                } catch (Exception ignore) {
                }
            }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
        }
        //Set the default branch type to 'AT' in the RootContext.
        //设置默认分支类型AT到root上下文中
        RootContext.setDefaultBranchType(this.getBranchType());
    }

可以看到 mysql 获取schema

// mysql 获取schema
    @Override
    protected TableMeta fetchSchema(Connection connection, String tableName) throws SQLException {
        // 获取其中的一条,执行sql查询,然后设置元数据信息到schema中
        String sql = "SELECT * FROM " + ColumnUtils.addEscape(tableName, JdbcConstants.MYSQL) + " LIMIT 1";
        try (Statement stmt = connection.createStatement();
            ResultSet rs = stmt.executeQuery(sql)) {
            //将结果集元数据设置到schema中
            return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData());
        } catch (SQLException sqlEx) {
            throw sqlEx;
        } catch (Exception e) {
            throw new SQLException(String.format("Failed to fetch schema of %s", tableName), e);
        }
    }

设置的结果集元数据中可以看到:schemaName、catalogName、tableName、TableMeta、ColumnMeta、IndexMeta。

同时将表信息放入到缓存中:

Cache<String, TableMeta> TABLE_META_CACHE = Cache<String, TableMeta> TABLE_META_CACHE = Caffeine.newBuilder().maximumSize(CACHE_SIZE)
    .expireAfterWrite(EXPIRE_TIME, TimeUnit.MILLISECONDS).softValues().build();
TABLE_META_CACHE.put(entry.getKey(), tableMeta);


二、sql识别器

可以看到sql识别器会根据对应sql类型执行sql操作:

switch (sqlRecognizer.getSQLType()) {
                    case INSERT:
                        executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                new Object[]{statementProxy, statementCallback, sqlRecognizer});
                        break;
                    case UPDATE:
                        executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case DELETE:
                        executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case SELECT_FOR_UPDATE:
                        executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    default:
                        executor = new PlainExecutor<>(statementProxy, statementCallback);
                        break;
                }


三、一阶段sql执行器前后操作

可以看到在io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitFalse中会执行几个重要的操作

生成前镜像、执行sql、生成后镜像、准备undo log日志数据

/**
     * Execute auto commit false t.
     *
     * @param args the args
     * @return the t
     * @throws Exception the exception
     */
    protected T executeAutoCommitFalse(Object[] args) throws Exception {
        if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
            throw new NotSupportYetException("multi pk only support mysql!");
        }
        LOGGER.info("----执行自动提交 false------");
        LOGGER.info("----生成前镜像------");
        TableRecords beforeImage = beforeImage();
        LOGGER.info("----执行sql操作------");
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        LOGGER.info("----生成后镜像------");
        TableRecords afterImage = afterImage(beforeImage);
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }

之后执行二阶段处理提交


四、二阶段提交

提交sql,如果没有发生异常,则删除undo log日志。否则,执行回滚操作,执行undo log日志,也即通过镜像sql执行复原数据操作。


相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
8月前
|
SQL 关系型数据库 数据库
学习分布式事务Seata看这一篇就够了,建议收藏
学习分布式事务Seata看这一篇就够了,建议收藏
|
8月前
|
Java 数据库连接 API
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
146 0
|
8月前
|
存储 Oracle 关系型数据库
分布式事物【Seata实现、下载启动Seata服务、搭建聚合父工程构建】(四)-全面详解(学习总结---从入门到深化)
分布式事物【Seata实现、下载启动Seata服务、搭建聚合父工程构建】(四)-全面详解(学习总结---从入门到深化)
98 0
|
8月前
|
开发框架 Java 数据库连接
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)(下)
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
116 0
|
8月前
|
Java 数据库连接 mybatis
分布式事物【Seata实现、下载启动Seata服务、搭建聚合父工程构建】(四)-全面详解(学习总结---从入门到深化)(下)
分布式事物【Seata实现、下载启动Seata服务、搭建聚合父工程构建】(四)-全面详解(学习总结---从入门到深化)
76 0
|
存储 缓存 uml
Seata的TCC模式学习整理
Seata的TCC模式学习整理
94 0
|
缓存 Java Spring
Seata中的TCC模式学习一
Seata中的TCC模式学习一
146 0
|
存储 SpringCloudAlibaba Java
SpringCloud Alibaba学习(十二):Seata处理分布式事务(三万字提供 介绍、搭建、实战、原理一条龙服务)(上)
SpringCloud Alibaba学习(十二):Seata处理分布式事务(三万字提供 介绍、搭建、实战、原理一条龙服务)
187 0
SpringCloud Alibaba学习(十二):Seata处理分布式事务(三万字提供 介绍、搭建、实战、原理一条龙服务)(上)
|
SQL SpringCloudAlibaba Java
SpringCloud Alibaba学习(十二):Seata处理分布式事务(三万字提供 介绍、搭建、实战、原理一条龙服务)(下)
SpringCloud Alibaba学习(十二):Seata处理分布式事务(三万字提供 介绍、搭建、实战、原理一条龙服务)
173 0
SpringCloud Alibaba学习(十二):Seata处理分布式事务(三万字提供 介绍、搭建、实战、原理一条龙服务)(下)
|
Nacos
【学习Seata1.6源码#03】TC 集群具有高可用架构的秘密
【学习Seata1.6源码#03】TC 集群具有高可用架构的秘密
244 0