初始化
- SeataDataSourceProxy的创建
在Seata Client初始化过程中,会通过io.seata.spring.boot.autoconfigure.SeataDataSourceAutoConfiguration
创建SeataAutoDataSourceProxyCreator
,在SeataAutoDataSourceProxyCreator
的wrapIfNecessary()
方法中,会针对DataSource做一层代理,这样的话,就可以在构建出来的代理对象中为实现分布式事务插入自定义逻辑; - RMClient的初始化
Seata Client初始化的另一部分逻辑中,也就是io.seata.spring.boot.autoconfigure.SeataAutoConfiguration
中,会构建GlobalTransactionScanner
对象,在该对象的初始化方法中,会调用RMClient.init()
,该方法的作用是建立与TC服务的通信,以便后续分支事务的注册、状态上报、提交或回滚;
业务执行过程
在业务执行过程中,在执行数据库操作时,主要关注SeataDataSourceProxy
,也就是AT模式实现类:
public class DataSourceProxy extends AbstractDataSourceProxy implements Resource { @Override public ConnectionProxy getConnection() throws SQLException { Connection targetConnection = targetDataSource.getConnection(); return new ConnectionProxy(this, targetConnection); } @Override public ConnectionProxy getConnection(String username, String password) throws SQLException { Connection targetConnection = targetDataSource.getConnection(username, password); return new ConnectionProxy(this, targetConnection); } } 复制代码
DataSourceProxy
重写了getConnection()
方法,针对获得的Connection
也做了一个代理层ConnectionProxy
,因为ConnectionProxy
继承了AbstractConnectionProxy
,核心逻辑还是在AbstractConnectionProxy
中:
public abstract class AbstractConnectionProxy implements Connection { @Override public Statement createStatement() throws SQLException { Statement targetStatement = getTargetConnection().createStatement(); // 包装Statement return new StatementProxy(this, targetStatement); } @Override public PreparedStatement prepareStatement(String sql) throws SQLException { String dbType = getDbType(); // support oracle 10.2+ PreparedStatement targetPreparedStatement = null; if (BranchType.AT == RootContext.getBranchType()) { List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType); if (sqlRecognizers != null && sqlRecognizers.size() == 1) { SQLRecognizer sqlRecognizer = sqlRecognizers.get(0); if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) { TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(), sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId()); String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()]; tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray); // 调用原生Connection.prepareStatement()实现预编译逻辑 targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray); } } } if (targetPreparedStatement == null) { targetPreparedStatement = getTargetConnection().prepareStatement(sql); } // 同样针对PreparedStatement做一层代理 return new PreparedStatementProxy(this, targetPreparedStatement, sql); } } 复制代码
根据上述源码,可以知道,其实RM层的大体框架就是利用了Proxy方式针对DataSource、Connection、
Statement、PreparedStatement做一层代理,在执行数据库操作的过程中插入分布式事务逻辑;
逐步深入源码,我们需要了解PreparedStatementProxy
的部分实现逻辑:
public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder { // 采用模版模式 @Override public int executeUpdate() throws SQLException { return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate()); } } 复制代码
当执行sql语句的时候,PreparedStatementProxy
采用了模版模式:
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException { if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) { // 没有全局锁,也不是AT模式,直接使用原生statement return statementCallback.execute(statementProxy.getTargetStatement(), args); } String dbType = statementProxy.getConnectionProxy().getDbType(); if (CollectionUtils.isEmpty(sqlRecognizers)) { sqlRecognizers = SQLVisitorFactory.get( statementProxy.getTargetSQL(), dbType); } // 根据不同的sql语句,使用不同类型的Executor,相当于就是策略模式 Executor<T> executor; if (CollectionUtils.isEmpty(sqlRecognizers)) { executor = new PlainExecutor<>(statementProxy, statementCallback); } else { if (sqlRecognizers.size() == 1) { SQLRecognizer sqlRecognizer = sqlRecognizers.get(0); switch (sqlRecognizer.getSQLType()) { case INSERT: // 插入 executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object[]{statementProxy, statementCallback, sqlRecognizer}); break; case UPDATE: // 更新 executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case DELETE: // 删除 executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case SELECT_FOR_UPDATE: // 当前读 executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case INSERT_ON_DUPLICATE_UPDATE: // 没有就插入,重复就更新 switch (dbType) { case JdbcConstants.MYSQL: case JdbcConstants.MARIADB: executor = new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer); break; default: throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE"); } break; default: executor = new PlainExecutor<>(statementProxy, statementCallback); break; } } else { // 批量处理 executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers); } } T rs; try { // 执行sql语句,重点看这里 rs = executor.execute(args); } catch (Throwable ex) { if (!(ex instanceof SQLException)) { // Turn other exception into SQLException ex = new SQLException(ex); } throw (SQLException) ex; } return rs; } 复制代码
我们需要重点关注executor.execute(args)
方法,这里使用到的是策略模式,不同的sql语句处理逻辑不一样,不过我们可以找到比较典型的UpdateExecutor
,在它继承的抽象类AbstractDMLBaseExecutor
中:
public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> { @Override public T doExecute(Object... args) throws Throwable { AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); // 如果实现没有设置的话,模式是自动提交 if (connectionProxy.getAutoCommit()) { // 默认执行当前分支 return executeAutoCommitTrue(args); } else { return executeAutoCommitFalse(args); } } protected T executeAutoCommitTrue(Object[] args) throws Throwable { ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); try { // 把自动提交设置成false connectionProxy.changeAutoCommit(); return new LockRetryPolicy(connectionProxy).execute(() -> { // 执行sql语句 T result = executeAutoCommitFalse(args); // 提交本地事务 connectionProxy.commit(); // 返回执行结果 return result; }); } catch (Exception e) { // when exception occur in finally,this exception will lost, so just print it here LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e); if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) { connectionProxy.getTargetConnection().rollback(); } // 异常抛出 throw e; } finally { // 恢复现场 // 重置connectionContext connectionProxy.getContext().reset(); // 恢复自动提交 connectionProxy.setAutoCommit(true); } } } 复制代码
上述代码使用模版模式完成了sql的执行,但是我们重点需要关注两部分逻辑:
- 执行sql语句
T result = executeAutoCommitFalse(args)
:
protected T executeAutoCommitFalse(Object[] args) throws Exception { if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) { throw new NotSupportYetException("multi pk only support mysql!"); } // 获取前镜像,也就是sql执行前的快照 TableRecords beforeImage = beforeImage(); // 真正地执行sql语句 T result = statementCallback.execute(statementProxy.getTargetStatement(), args); // 获取影响行数 int updateCount = statementProxy.getUpdateCount(); if (updateCount > 0) { // 如果影响行数大于0,那么获取执行后的快照 TableRecords afterImage = afterImage(beforeImage); // 计算行锁,设置到ConnectionContext中,下面需要取出来 // 计算undoLogItem,后面要用 prepareUndoLog(beforeImage, afterImage); } return result; } 复制代码
- 提交本地事务
connectionProxy.commit()
@Override public void commit() throws SQLException { try { // 重试策略 lockRetryPolicy.execute(() -> { // 真正地提交本地事务 doCommit(); return null; }); } catch (SQLException e) { if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) { // 失败后回滚 rollback(); } throw e; } catch (Exception e) { throw new SQLException(e); } } private void doCommit() throws SQLException { // 如果使用的@GlobalTransaction if (context.inGlobalTransaction()) { // AT模式下,肯定执行当前分支 processGlobalTransactionCommit(); } else if (context.isGlobalLockRequire()) { // 如果使用的@GlobalLock processLocalCommitWithGlobalLocks(); } else { // 不在分布式事务中,原生connection提交 targetConnection.commit(); } } private void processGlobalTransactionCommit() throws SQLException { try { // 向远程TC服务注册当前分支事务,并加上行锁 register(); } catch (TransactionException e) { recognizeLockKeyConflictException(e, context.buildLockKeys()); } try {//插入undoLog UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); // 提交本地事务 targetConnection.commit(); } catch (Throwable ex) { LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex); // 提交失败,上报分支事务状态 report(false); throw new SQLException(ex); } if (IS_REPORT_SUCCESS_ENABLE) { // 提交成功,上报分支事务状态 report(true); } // 恢复ConnectionContext context.reset(); } 复制代码
- 综上所述,RM在分支事务执行过程中,主要有以下几步:
1.先获取业务sql执行前的快照BeforeImage
2.执行业务sql
3.如果业务sql影响行数大于0,那么需要获取业务sql执行后的快照AfterImage;并计算行锁以及后续需要插入的undolog;
4.向TC服务注册分支事务,并添加行锁;此处可能会存在锁冲突导致注册失败,直至注册成功或者分布式事务超时;
5.注册成功后,插入undolog日志表记录;
6.提交本地事务,此时业务sql和undolog日志一起提交;
7.上报分支事务状态;
8.恢复现场;