在之前的博客中,已经介绍过了TM在seata AT模式中的处理流程、TC在seata分布式事务中的主要任务以及RM在seata AT模式中的sql语句执行流程,下面我们通过源码讲述分布式事务是如何实现提交或回滚的。
在seata AT模式中,只有当所有的分支事务全部成功提交后,才会触发分布式事务的提交:
public class ConnectionProxy extends AbstractConnectionProxy { private void processGlobalTransactionCommit() throws SQLException { try { // 注册分支事务,添加行锁,其实就是分布式锁 register(); } catch (TransactionException e) { recognizeLockKeyConflictException(e, context.buildLockKeys()); } try { // 插入undolog UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); // 提交本地事务 targetConnection.commit(); } catch (Throwable ex) { LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex); // 上报分支事务提交失败状态 report(false); // 抛出异常,最终会被TM捕捉到,触发分布式事务的回滚 throw new SQLException(ex); } if (IS_REPORT_SUCCESS_ENABLE) { // 上报分支事务成功提交状态 report(true); } // 恢复现场 context.reset(); } } 复制代码
public Object execute(TransactionalExecutor business) throws Throwable { // 1. 拿到整理好的@GlobalTransactional注解里面的配置信息 TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } // 1.1 获取当前的分布式事务,如果为null的话,说明这是分布式事务的发起者;如果不为null,说明这是分布式事务的参与者 GlobalTransaction tx = GlobalTransactionContext.getCurrent(); // 1.2 获取分布式事务的传播级别,其实就是按照spring的传播级别来一套,区别就是spring事务是本地事务,这是分布式事务,原理都一样 Propagation propagation = txInfo.getPropagation(); SuspendedResourcesHolder suspendedResourcesHolder = null; try { // 这个switch里面全都是处理分布式事务传播级别的 switch (propagation) { // 如果不支持分布式事务,如果当前存在事务,那么先挂起当前的分布式事务,再执行业务逻辑 case NOT_SUPPORTED: // 分布式事务存在,先挂起 if (existingTransaction(tx)) { suspendedResourcesHolder = tx.suspend(); } // 执行业务逻辑 return business.execute(); // 如果是每次都要创建一个新的分布式事务,先把当前存在的分布式事务挂起,然后创建一个新分布式事务 case REQUIRES_NEW: // 如果分布式事务存在,先挂起当前分布式事务,再创建一个新的分布式事务 if (existingTransaction(tx)) { suspendedResourcesHolder = tx.suspend(); tx = GlobalTransactionContext.createNew(); } // 之所以用break,是为了后面的代码和其他的传播级别一起共用,业务逻辑肯定还是要执行的 break; // 如果支持分布式事务,如果当前不存在分布式事务,那么直接执行业务逻辑,否则以分布式事务的方式执行业务逻辑 case SUPPORTS: // 如果不存在分布式事务,直接执行业务逻辑 if (notExistingTransaction(tx)) { return business.execute(); } // 否则,以分布式事务的方式执行业务逻辑 break; // 如果有分布式事务,就在当前分布式事务下执行业务逻辑,否则创建一个新的分布式事务执行业务逻辑 case REQUIRED: // If current transaction is existing, execute with current transaction, // else continue and execute with new transaction. break; // 如果不允许有分布式事务,那么一旦发现存在分布式事务,直接抛异常;只有不存在分布式事务的时候才正常执行 case NEVER: // 存在分布式事务,抛异常 if (existingTransaction(tx)) { throw new TransactionException( String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s" , tx.getXid())); } else { // 不存在分布式事务,执行业务逻辑 return business.execute(); } // 一定要有分布式事务,分布式事务不存在的话,抛异常; case MANDATORY: // 不存在分布式事务,抛异常 if (notExistingTransaction(tx)) { throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'"); } // Continue and execute with current transaction. break; default: throw new TransactionException("Not Supported Propagation:" + propagation); } // 上面的传播级别的逻辑处理完毕,下面就是公共的处理逻辑 // 1.3 如果当前分布式事务没有的话,那么我们就要创建新的分布式事务,此时我们就是分布式事务的发起者,也就是TM本身,否则不能称之为`TM` if (tx == null) { tx = GlobalTransactionContext.createNew(); } // 开始准备干活的条件 // 把我们这个方法的全局锁配置放进当前线程中,并且把线程中已有的全局锁的配置取出来 // 我们在干完自己的活后,会把这个取出来的配置放回去的 GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo); try { // 2. 如果我们是分布式事务的发起者的话,那么我们会和TC通信,并且拿到一个XID;如果我们不是分布式事务的发起者的话,那么这一步啥也不干 // 这个XID可以从RootContext中获取 beginTransaction(txInfo, tx); Object rs; try { // 执行业务逻辑 rs = business.execute(); } catch (Throwable ex) { // 3. 发生任何异常,我们准备启动回滚机制 completeTransactionAfterThrowing(txInfo, tx, ex); throw ex; } // 4. 一切顺利,通知提交分布式事务 commitTransaction(tx); return rs; } finally { //5. 恢复现场,把之前的配置放回去 resumeGlobalLockConfig(previousConfig); // 触发回调 triggerAfterCompletion(); // 清理工作 cleanUp(); } } finally { // 恢复之前挂起的事务 if (suspendedResourcesHolder != null) { tx.resume(suspendedResourcesHolder); } } } 复制代码
当任意RM分支事务产生异常后,都会触发TM执行completeTransactionAfterThrowing(txInfo, tx, ex)
- 异常回滚
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException { // 如果异常类型和指定的类型一致,那么发起回滚;不一致还是要提交分布式事务 if (txInfo != null && txInfo.rollbackOn(originalException)) { try { // 回滚分布式事务 rollbackTransaction(tx, originalException); } catch (TransactionException txe) { // 回滚失败抛异常 throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, originalException); } } else { // 不是指定的异常类型,还是继续提交分布式事务 commitTransaction(tx); } } private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException { // 执行回调,默认空回调 triggerBeforeRollback(); // 回滚 tx.rollback(); // 执行回调,默认空回调 triggerAfterRollback(); // 就算回滚没问题,照样抛异常,目的应该是告知开发人员此处产生了回滚 throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus()) ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException); } 复制代码
@Override public void rollback() throws TransactionException { // 如果是分布式事务参与者,那么啥也不做,RM的回滚不在这里,这是TM的回滚 if (role == GlobalTransactionRole.Participant) { // Participant has no responsibility of rollback if (LOGGER.isDebugEnabled()) { LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid); } return; } assertXIDNotNull(); // 下面就是一个循环重试发起分布式事务回滚 int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT; try { while (retry > 0) { try { retry--; // 发起回滚的核心代码 status = transactionManager.rollback(xid); // 回滚成功跳出循环 break; } catch (Throwable ex) { LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage()); // 重试失败次数完成才会跳出循环 if (retry == 0) { throw new TransactionException("Failed to report global rollback", ex); } } } } finally { // 如果回滚的分布式事务就是当前的分布式事务,那么从当前线程中解绑XID if (xid.equals(RootContext.getXID())) { suspend(); } } if (LOGGER.isInfoEnabled()) { LOGGER.info("[{}] rollback status: {}", xid, status); } } @Override public GlobalStatus rollback(String xid) throws TransactionException { // 准备发起请求给TC,回滚指定的分布式事务 GlobalRollbackRequest globalRollback = new GlobalRollbackRequest(); globalRollback.setXid(xid); GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback); return response.getGlobalStatus(); } 复制代码
- 触发回滚需要产生的异常和注解中指定的异常一致才会发起回滚,否则还是继续提交;
- 回滚是可以设置重试次数的,只有重试都失败了,才会导致回滚失败,否则只要有一次成功,那么回滚就成功;
- TM发起的回滚其实只是和TC发起一次分布式事务回滚的通信,并没有数据库的操作;
- 分布式事务提交
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { try { // 回调,默认空回调 triggerBeforeCommit(); // 分布式事务提交 tx.commit(); // 回调,默认空回调 triggerAfterCommit(); } catch (TransactionException txe) { // 4.1 提交出异常,提交失败 throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure); } } @Override public void commit() throws TransactionException { // 如果只是分布式事务参与者,那么啥也不干,TM只能有一个,哈哈 if (role == GlobalTransactionRole.Participant) { // Participant has no responsibility of committing if (LOGGER.isDebugEnabled()) { LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid); } return; } assertXIDNotNull(); // 分布式事务提交也是有重试的 int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT; try { while (retry > 0) { try { retry--; // 发起分布式事务提交 status = transactionManager.commit(xid); // 提交成功跳出循环 break; } catch (Throwable ex) { LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage()); // 重试结束,依然失败就抛异常 if (retry == 0) { throw new TransactionException("Failed to report global commit", ex); } } } } finally { // 如果提交的分布式事务就是当前事务,那么需要清理当前线程中的XID if (xid.equals(RootContext.getXID())) { suspend(); } } if (LOGGER.isInfoEnabled()) { LOGGER.info("[{}] commit status: {}", xid, status); } } @Override public GlobalStatus commit(String xid) throws TransactionException { // 发起分布式事务提交请求,这是与TC通信 GlobalCommitRequest globalCommit = new GlobalCommitRequest(); globalCommit.setXid(xid); GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit); return response.getGlobalStatus(); } 复制代码
- 分布式事务回滚也是可以设置重试次数的;
- 分布式事务提交其实也是TM与TC进行通信,告知TC这个XID对应的分布式事务可以提交了;
@Override public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException { boolean success = true; // start committing event MetricsPublisher.postSessionDoingEvent(globalSession, retrying); if (globalSession.isSaga()) { success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying); } else { // 直接从这里开始看,这里有一个forEach循环遍历分布式事务中的所有分支事务 Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> { // if not retrying, skip the canBeCommittedAsync branches if (!retrying && branchSession.canBeCommittedAsync()) { return CONTINUE; } BranchStatus currentStatus = branchSession.getStatus(); if (currentStatus == BranchStatus.PhaseOne_Failed) { SessionHelper.removeBranch(globalSession, branchSession, !retrying); return CONTINUE; } // 前面都是一些状态的校验,下面开始发起分支事务的提交 try { BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession); // 下面代码可忽略 if (isXaerNotaTimeout(globalSession,branchStatus)) { LOGGER.info("Commit branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId()); branchStatus = BranchStatus.PhaseTwo_Committed; } switch (branchStatus) { // 如果该分支事务已经提交,说明提交成功了,那么移除该分支 case PhaseTwo_Committed: SessionHelper.removeBranch(globalSession, branchSession, !retrying); return CONTINUE; // 如果该分支事务已经提交失败,并且无法重试,那么打上标记 case PhaseTwo_CommitFailed_Unretryable: //not at branch SessionHelper.endCommitFailed(globalSession, retrying); LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId()); return false; default: // 进入重试队列 if (!retrying) { globalSession.queueToRetryCommit(); return false; } // 如果可以异步提交,那就直接跳过,定时任务会执行 if (globalSession.canBeCommittedAsync()) { LOGGER.error("Committing branch transaction[{}], status:{} and will retry later", branchSession.getBranchId(), branchStatus); return CONTINUE; } else { // 剩下的就是提交失败的 LOGGER.error( "Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId()); return false; } } } catch (Exception ex) { StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}", new String[] {branchSession.toString()}); if (!retrying) { // 出了异常,放入队列继续等待提交 globalSession.queueToRetryCommit(); // 抛出异常给TM throw new TransactionException(ex); } } return CONTINUE; }); // Return if the result is not null if (result != null) { return result; } //If has branch and not all remaining branches can be committed asynchronously, //do print log and return false if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) { LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid()); return false; } if (!retrying) { // 设置分布式事务状态为提交成功 globalSession.setStatus(GlobalStatus.Committed); } } // if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is // executed to improve concurrency performance, and the global transaction ends.. if (success && globalSession.getBranchSessions().isEmpty()) { SessionHelper.endCommitted(globalSession, retrying); LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid()); } return success; } 复制代码
public abstract class AbstractRMHandler extends AbstractExceptionHandler implements RMInboundHandler, TransactionMessageHandler { // RM收到分支事务提交请求 @Override public BranchCommitResponse handle(BranchCommitRequest request) { BranchCommitResponse response = new BranchCommitResponse(); // 模版模式 exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() { @Override public void execute(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException { // 执行分支事务提交 doBranchCommit(request, response); } }, request, response); return response; } protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException { String xid = request.getXid(); long branchId = request.getBranchId(); String resourceId = request.getResourceId(); String applicationData = request.getApplicationData(); if (LOGGER.isInfoEnabled()) { LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData); } // 执行分支事务提交 BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData); // 返回响应结果 response.setXid(xid); response.setBranchId(branchId); response.setBranchStatus(status); if (LOGGER.isInfoEnabled()) { LOGGER.info("Branch commit result: " + status); } } } 复制代码
- 1.1:分支事务执行成功;
- 1.2:分支事务执行失败;
- 2.1:当任意RM分支事务执行出现异常,TM通过RPC向TC服务发起分布式事务的回滚;
- 2.2:只有所有RM都成功提交分支事务,TM通过RPC向TC服务发起分布式事务的提交;
- 3.1:TC接收到TM的提交请求,查询出所有的分支RM,并依次向所有的RM发起提交请求;
- 3.2:TC接收到TM的回滚请求,查询所有的分支RM,依次向所有RM发起回滚请求;
- 4.1:RM收到TC的提交请求,提交分支事务;
- 4.2:RM收到TC的回滚请求,回滚分支事务;