还记得之前讲的AT模式吗?可以回顾一下这篇文章《springcloud+eureka整合分布式事务中间件seata》这里主要讲述了springcloud+eureka微服务场景下AT模式的使用。
AT模式的全局事务是依赖于分支事务(单个服务或者单个数据源的事务)的,而分支事务本质上并没有实现2阶段提交,它能模拟出2阶段提交依赖的是undolog,这个mysql本身的2阶段提交是非常类似的。
分支事务在收到TC通知时,首先开启一个事务,获取到全局锁后就可以提交事务了,同时写undolog,收到TC的commit通知时,删除undolog,而收到TC的rollback通知时,使用undolog做交易补偿,然后删除undolog。
以mysql为例,undo_log表的建表语句如下:
CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'increment id', `branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id', `xid` varchar(100) NOT NULL COMMENT 'global transaction id', `context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization', `rollback_info` longblob NOT NULL COMMENT 'rollback info', `log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status', `log_created` datetime NOT NULL COMMENT 'create datetime', `log_modified` datetime NOT NULL COMMENT 'modify datetime', `ext` varchar(100) DEFAULT NULL COMMENT 'reserved field', PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='AT transaction mode undo table';
rollback_info是里面的核心字段,记录了回滚的数据信息,我们看一条完整的undolog记录中rollback_info的值:
{ "@class":"io.seata.rm.datasource.undo.BranchUndoLog", "xid":"192.168.59.132:8091:34937248742391808", "branchId":34937257391046656, "sqlUndoLogs":[ "java.util.ArrayList", [ { "@class":"io.seata.rm.datasource.undo.SQLUndoLog", "sqlType":"INSERT", "tableName":"orders", "beforeImage":{ "@class":"io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords", "tableName":"orders", "rows":[ "java.util.ArrayList", [ ] ] }, "afterImage":{ "@class":"io.seata.rm.datasource.sql.struct.TableRecords", "tableName":"orders", "rows":[ "java.util.ArrayList", [ { "@class":"io.seata.rm.datasource.sql.struct.Row", "fields":[ "java.util.ArrayList", [ { "@class":"io.seata.rm.datasource.sql.struct.Field", "name":"id", "keyType":"PRIMARY_KEY", "type":4, "value":2 }, { "@class":"io.seata.rm.datasource.sql.struct.Field", "name":"user_id", "keyType":"NULL", "type":4, "value":1 }, { "@class":"io.seata.rm.datasource.sql.struct.Field", "name":"product_id", "keyType":"NULL", "type":4, "value":1 }, { "@class":"io.seata.rm.datasource.sql.struct.Field", "name":"pay_amount", "keyType":"NULL", "type":3, "value":[ "java.math.BigDecimal", 1 ] }, { "@class":"io.seata.rm.datasource.sql.struct.Field", "name":"status", "keyType":"NULL", "type":12, "value":"INIT" }, { "@class":"io.seata.rm.datasource.sql.struct.Field", "name":"add_time", "keyType":"NULL", "type":93, "value":[ "java.sql.Timestamp", [ 1596793692000, 0 ] ] }, { "@class":"io.seata.rm.datasource.sql.struct.Field", "name":"last_update_time", "keyType":"NULL", "type":93, "value":[ "java.sql.Timestamp", [ 1596793692000, 0 ] ] } ] ] } ] ] } } ] ] }
上面这条undolog是一个插入操作的记录,我们可以看到,里面有一个beforeImage和一个afterImage,beforeImage就是写操作之前的数据备份,而afterImage就是写操作之后的数据。这样回滚到写之前的状态就很容易了。
熟悉了seata中AT模式的业务后,我们来看一下seata源码中AT模式的undolog是怎么实现的。
ResourceManager
真正调用undolog接口来触发分支事务的commit/rollback操作的是ResourceManager,我们先看一下ResourceManager相关的UML类图:
在AT模式下,用的是DataSourceManager,这里触发分支事务的commit/rollback,进而出发了undolog的DML操作。
我们先看一下commit代码:
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData); }
这里调用了AsyncWorker类的branchCommit方法,而这个方法并没有真正提交事务,只是把这个事务放入队列,代码如下:
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) { LOGGER.warn("Async commit buffer is FULL. Rejected branch [{}/{}] will be handled by housekeeping later.", branchId, xid); } return BranchStatus.PhaseTwo_Committed; }
而真正提交事务的地方在下面这个方法,这个方法被一个定时线程池不断地调用,然后取出队列中的Phase2Context进行提交,代码如下:
private void doBranchCommits() { //省略部分代码 Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE); while (!ASYNC_COMMIT_BUFFER.isEmpty()) {//把队列中的所有Phase2Context都放入一个HashMap,叫mappedContexts,这里的key是resourceId,比如:jdbc:mysql://192.168.59.1:3306/seata_order Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll(); List<Phase2Context> contextsGroupedByResourceId = mappedContexts.computeIfAbsent(commitContext.resourceId, k -> new ArrayList<>()); contextsGroupedByResourceId.add(commitContext); } for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) { Connection conn = null; DataSourceProxy dataSourceProxy; try { try {//根据resourceId获得原始的Connection,这个Connection是java.sql.Connection DataSourceManager resourceManager = (DataSourceManager) DefaultResourceManager.get()//resourceManager里面有一个map存放着所有的分支dataSourceProxy .getResourceManager(BranchType.AT); dataSourceProxy = resourceManager.get(entry.getKey()); if (dataSourceProxy == null) { throw new ShouldNeverHappenException("Failed to find resource on " + entry.getKey()); } conn = dataSourceProxy.getPlainConnection(); } catch (SQLException sqle) { LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle); continue; } List<Phase2Context> contextsGroupedByResourceId = entry.getValue(); Set<String> xids = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE); Set<Long> branchIds = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE); for (Phase2Context commitContext : contextsGroupedByResourceId) { xids.add(commitContext.xid); branchIds.add(commitContext.branchId); int maxSize = Math.max(xids.size(), branchIds.size()); if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) {//达到1000个的时候,批量删除undolog try { UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog( xids, branchIds, conn); } catch (Exception ex) { LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex); } xids.clear(); branchIds.clear(); } } //省略部分代码 UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids,branchIds, conn);//剩下不足1000的部分删除 //省略部分代码 } }
我们再看一rollback的关代码:
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { DataSourceProxy dataSourceProxy = get(resourceId); //省略部分代码 UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId); //省略部分代码 return BranchStatus.PhaseTwo_Rollbacked; }
可以看到这段代码的逻辑就是调用了UndoLogManager的undo方法,进行undolog相关的回滚操作。
从上面的commit和rollback的逻辑我们可以看出,这里都是使用了UndoLogManager进行undolog操作,上面分别做了batchDelete和undo操作,下一节我们看一下UndoLogManager具体是怎么实现这些逻辑的。
UndoLogManager
undolog相关的UML类图如下:
有一个接口UndoLogManager和默认实现类AbstractUndoLogManager,这一块儿使用了模板模式,默认抽象实现中留了一些抽象方法给不同的数据库来实现。这里我主要讲解mysql数据库的undolog实现。对应的类是MySQLUndoLogManager。
1.回滚和删除undolog
undo方法和batchDeleteUndoLog方法都在父类AbstractUndoLogManager中,我们看一下batchDeleteUndoLog源代码:
public void batchDeleteUndoLog(Set<String> xids, Set<Long> branchIds, Connection conn) throws SQLException {//批量删除undo_log表,根据branch_id和xid if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) { return; } int xidSize = xids.size(); int branchIdSize = branchIds.size(); String batchDeleteSql = toBatchDeleteUndoLogSql(xidSize, branchIdSize); try (PreparedStatement deletePST = conn.prepareStatement(batchDeleteSql)) { int paramsIndex = 1; for (Long branchId : branchIds) { deletePST.setLong(paramsIndex++, branchId); } for (String xid : xids) { deletePST.setString(paramsIndex++, xid); } int deleteRows = deletePST.executeUpdate(); //省略部分代码 } //省略部分代码 } protected static String toBatchDeleteUndoLogSql(int xidSize, int branchIdSize) {//这个批量删除的sql非常容易理解 StringBuilder sqlBuilder = new StringBuilder(64);//delete from undo_log where branch_id in() and xid in(); sqlBuilder.append("DELETE FROM ").append(UNDO_LOG_TABLE_NAME).append(" WHERE ").append( ClientTableColumnsName.UNDO_LOG_BRANCH_XID).append(" IN "); appendInParam(branchIdSize, sqlBuilder); sqlBuilder.append(" AND ").append(ClientTableColumnsName.UNDO_LOG_XID).append(" IN "); appendInParam(xidSize, sqlBuilder); return sqlBuilder.toString(); }
从上面的注解看出,其实这段代码非常简单,就是删除undo_log表的记录,批量删除的依据是分支事务id(branch_id)和全局事务id(xid)。
再来看一下undo源代码,这个代码的业务逻辑就是使用undo_log的数据来补偿分支事务相关的表:
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException { Connection conn = null; ResultSet rs = null; PreparedStatement selectPST = null; boolean originalAutoCommit = true; for (; ; ) { try { conn = dataSourceProxy.getPlainConnection(); // The entire undo process should run in a local transaction. if (originalAutoCommit = conn.getAutoCommit()) {//mysql就是默认自动提交,所以这儿要设置为在当前事务中手工提交 conn.setAutoCommit(false); } // Find UNDO LOG selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL); selectPST.setLong(1, branchId); selectPST.setString(2, xid); rs = selectPST.executeQuery(); boolean exists = false; while (rs.next()) {//先查出undo_log数据,如果存在,就进行回滚操作 exists = true; // It is possible that the server repeatedly sends a rollback request to roll back // the same branch transaction to multiple processes, // ensuring that only the undo_log in the normal state is processed. int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS); if (!canUndo(state)) {//如果整个事务已经完成,就不能undo了,直接返回 if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state); } return; } String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);//这个字段存的是处理json的工具,默认值:serializer=jackson Map<String, String> context = parseContext(contextString);//{"serializer","jackson"} byte[] rollbackInfo = getRollbackInfo(rs);//rollback_info的内容在文章开头已经给出,这里是转化成byte数组 String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY); UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() : UndoLogParserFactory.getInstance(serializer);//如果为空,则取默认jackson BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);//把rollback_info字段解析成BranchUndoLog对象 try { // put serializer name to local setCurrentSerializer(parser.getName());//设置解析方式为jackson List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs(); if (sqlUndoLogs.size() > 1) {//变成逆序,为什么呢?因为事务的回滚就是要从后往前依次回滚,还记得mysql的mvcc机制吗,跟那个一个意思。 Collections.reverse(sqlUndoLogs); } for (SQLUndoLog sqlUndoLog : sqlUndoLogs) { TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta( conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId()); sqlUndoLog.setTableMeta(tableMeta); AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor( dataSourceProxy.getDbType(), sqlUndoLog); undoExecutor.executeOn(conn); } } finally {//把当前serializer(jackson)设置为空 // remove serializer name removeCurrentSerializer(); } } // If undo_log exists, it means that the branch transaction has completed the first phase, // we can directly roll back and clean the undo_log // Otherwise, it indicates that there is an exception in the branch transaction, // causing undo_log not to be written to the database. // For example, the business processing timeout, the global transaction is the initiator rolls back. // To ensure data consistency, we can insert an undo_log with GlobalFinished state // to prevent the local transaction of the first phase of other programs from being correctly submitted. // See https://github.com/seata/seata/issues/489 if (exists) {//删除undolog deleteUndoLog(xid, branchId, conn); conn.commit(); if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId, State.GlobalFinished.name()); } } else {//全局事务成功后插入undolog,状态是GlobalFinished insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn); conn.commit(); if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId, State.GlobalFinished.name()); } } return; } //后面的catch和finally相关代码省略 } }
这段代码我加了详细的注解,主要逻辑就是查出undo_log表中rollback_info字段内容,如果存在,则使用这个数据进行回滚操作,完成后删除undolog;如果不存在,则写入undolog,状态是GlobalFinished。
2.插入undolog
上面我们讲完了undolog的回滚和删除,那么undolog什么时候写入呢?上面已经讲到了状态是GlobalFinished的undolog写入,下面我们看一下状态是Normal的undolog写入。方法也在AbstractUndoLogManager这个类,但是跟踪起来还是有一定困难的。
再回顾一下之前讲的微服务案例,每个服务都有自己的数据源配置,代码如下:
public class DataSourceConfiguration { @Bean @ConfigurationProperties(prefix = "spring.datasource") public DataSource hikariDataSource(){ return DataSourceBuilder.create().build(); } @Primary @Bean("dataSource") public DataSourceProxy dataSource(DataSource hikariDataSource){ return new DataSourceProxy(hikariDataSource); } @Bean, public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy)throws Exception{ SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); sqlSessionFactoryBean.setDataSource(dataSourceProxy); sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver() .getResources("classpath*:/mapper/*.xml")); sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory()); return sqlSessionFactoryBean.getObject(); } }
可以看到的是,这个配置里面的DataSource并没有使用已有的数据库连接池的,而是自己写了一个DataSourceProxy,由DataSourceProxy来代理事务管理。同样的,seata写了PreparedStatementProxy来代理PreparedStatement,ConnectionProxy来代理Connection。
我们知道,jdbc在执行sql的时候,先是PreparedStatement的excute方法,在seata中,这个动作是在PreparedStatementProxy中执行excute,代码如下:
public boolean execute() throws SQLException { return ExecuteTemplate.execute(this, (statement, args) -> statement.execute()); } @Override public ResultSet executeQuery() throws SQLException { return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery()); } @Override public int executeUpdate() throws SQLException { return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate()); }
而一直跟踪代码中的execute的方法,会进入到ConnectionProxy类的setAutoCommit方法,代码如下:
public T doExecute(Object... args) throws Throwable {//mysql默认autoCommit是true,所以会进入第一个分支 AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); if (connectionProxy.getAutoCommit()) { return executeAutoCommitTrue(args); } else { return executeAutoCommitFalse(args); } } protected T executeAutoCommitTrue(Object[] args) throws Throwable { ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); try { connectionProxy.setAutoCommit(false);//调用下面的setAutoCommit方法不走if分支 return new LockRetryPolicy(connectionProxy).execute(() -> { T result = executeAutoCommitFalse(args);//实际还是走上面方法的else分支,只是把autoCommit设置为false connectionProxy.commit(); return result; }); } catch (Exception e) { // when exception occur in finally,this exception will lost, so just print it here LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e); if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) { connectionProxy.getTargetConnection().rollback(); } throw e; } finally { connectionProxy.getContext().reset(); connectionProxy.setAutoCommit(true);////调用下面的setAutoCommit方法走if分支,if分支里面有一个doCommit动作,走最后一个else分支,提交的就是undolog } } public void setAutoCommit(boolean autoCommit) throws SQLException {//第一次走到这个方法,因为autoCommit是false,所以走不进这个分支,直接就到下面那个提交方法把当前事务设置为非自动提交 if (autoCommit && !getAutoCommit()) {//如果默认autoCommit就是false,是走不到这个方法的 // change autocommit from false to true, we should commit() first according to JDBC spec. doCommit(); } targetConnection.setAutoCommit(autoCommit); } private void doCommit() throws SQLException { if (context.inGlobalTransaction()) {//提交分支事务,注意,这里不提交undolog processGlobalTransactionCommit(); } else if (context.isGlobalLockRequire()) {//获取全局事务锁 processLocalCommitWithGlobalLocks(); } else { targetConnection.commit();//提交undolog。注意:targetConnection就是真正的Connection,这里连接池使用的是HikariCp,是HikariProxyConnection } } private void processGlobalTransactionCommit() throws SQLException { try { register(); } catch (TransactionException e) { recognizeLockKeyConflictException(e, context.buildLockKeys()); } try { UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);//插入undolog的入口 targetConnection.commit(); } catch (Throwable ex) { LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex); report(false); throw new SQLException(ex); } if (IS_REPORT_SUCCESS_ENABLE) { report(true); } context.reset(); }
我上面加了非常详细的注解,但是这段代码还是比较复杂,我再加一个流程图,还是以我本地测试mysql数据库为例:
这样就很容易理解这段代码了,很容易看出,seata AT模式中的2阶段提交,是先提交分支事务,后提交undolog。
总结
1.seata AT模式中的undolog的代码逻辑稍微复杂一点,多debug几次,就可以看明白了。
2.seata的设计是先写分支事务,后写undolog,典型的2阶段提交。
3.提交分支事务之前,会先执行undolog的PreparedStatement的executeUpdate方法。为什么这样做呢?PreparedStatement的executeUpdate方法是预编译,已经检查到了sql中的错误,比如字段长度超出限定之类的,这样如果undolog的sql有问题则抛出异常,分支事务就不会提交了。
4.undolog的写入是异步的,而且针对每个全局事务,会先缓存所有undolog,等分支事务提交完成后提交undolog事务。
5.关键代码逻辑在AbstractDMLBaseExecutor类的executeAutoCommitTrue方法,这个逻辑需要仔细看。