前面说过,seata在做二阶段提交前会生成前镜像、执行sql、生成后镜像。那么首先需要做的是,有数据源进行连接,然后需要对表的元数据信息进行抽取。这样才可以进行前镜像以及后镜像的操作。
一、初始化数据源元数据信息
可以看到io.seata.rm.datasource.DataSourceProxy中的构造函数会执行初始化方法
public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) { if (targetDataSource instanceof SeataDataSourceProxy) { targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource(); } this.targetDataSource = targetDataSource; //执行初始化 init(targetDataSource, resourceGroupId); }
执行初始化方法会提取相关信息:
//执行初始化 private void init(DataSource dataSource, String resourceGroupId) { this.resourceGroupId = resourceGroupId; //获取相关数据源信息 try (Connection connection = dataSource.getConnection()) { jdbcUrl = connection.getMetaData().getURL(); dbType = JdbcUtils.getDbType(jdbcUrl); if (JdbcConstants.ORACLE.equals(dbType)) { userName = connection.getMetaData().getUserName(); } } catch (SQLException e) { throw new IllegalStateException("can not init dataSource", e); } //注册数据源 DefaultResourceManager.get().registerResource(this); if (ENABLE_TABLE_META_CHECKER_ENABLE) { tableMetaExcutor.scheduleAtFixedRate(() -> { //获取数据远连接 try (Connection connection = dataSource.getConnection()) { //执行刷新表元数据缓存 TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType()) .refresh(connection, DataSourceProxy.this.getResourceId()); } catch (Exception ignore) { } }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS); } //Set the default branch type to 'AT' in the RootContext. //设置默认分支类型AT到root上下文中 RootContext.setDefaultBranchType(this.getBranchType()); }
可以看到 mysql 获取schema
// mysql 获取schema @Override protected TableMeta fetchSchema(Connection connection, String tableName) throws SQLException { // 获取其中的一条,执行sql查询,然后设置元数据信息到schema中 String sql = "SELECT * FROM " + ColumnUtils.addEscape(tableName, JdbcConstants.MYSQL) + " LIMIT 1"; try (Statement stmt = connection.createStatement(); ResultSet rs = stmt.executeQuery(sql)) { //将结果集元数据设置到schema中 return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData()); } catch (SQLException sqlEx) { throw sqlEx; } catch (Exception e) { throw new SQLException(String.format("Failed to fetch schema of %s", tableName), e); } }
设置的结果集元数据中可以看到:schemaName、catalogName、tableName、TableMeta、ColumnMeta、IndexMeta。
同时将表信息放入到缓存中:
Cache<String, TableMeta> TABLE_META_CACHE = Cache<String, TableMeta> TABLE_META_CACHE = Caffeine.newBuilder().maximumSize(CACHE_SIZE) .expireAfterWrite(EXPIRE_TIME, TimeUnit.MILLISECONDS).softValues().build(); TABLE_META_CACHE.put(entry.getKey(), tableMeta);
二、sql识别器
可以看到sql识别器会根据对应sql类型执行sql操作:
switch (sqlRecognizer.getSQLType()) { case INSERT: executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object[]{statementProxy, statementCallback, sqlRecognizer}); break; case UPDATE: executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case DELETE: executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case SELECT_FOR_UPDATE: executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; default: executor = new PlainExecutor<>(statementProxy, statementCallback); break; }
三、一阶段sql执行器前后操作
可以看到在io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitFalse中会执行几个重要的操作
生成前镜像、执行sql、生成后镜像、准备undo log日志数据
/** * Execute auto commit false t. * * @param args the args * @return the t * @throws Exception the exception */ protected T executeAutoCommitFalse(Object[] args) throws Exception { if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) { throw new NotSupportYetException("multi pk only support mysql!"); } LOGGER.info("----执行自动提交 false------"); LOGGER.info("----生成前镜像------"); TableRecords beforeImage = beforeImage(); LOGGER.info("----执行sql操作------"); T result = statementCallback.execute(statementProxy.getTargetStatement(), args); LOGGER.info("----生成后镜像------"); TableRecords afterImage = afterImage(beforeImage); prepareUndoLog(beforeImage, afterImage); return result; }
之后执行二阶段处理提交
四、二阶段提交
提交sql,如果没有发生异常,则删除undo log日志。否则,执行回滚操作,执行undo log日志,也即通过镜像sql执行复原数据操作。