关联博文:
你认真研究过Spring中的@EnableTransactionManagement注解吗?
接上文SpringBoot中事务执行原理分析(三)我们分析过事务信息对象的获取后,本文我们继续分析目标方法执行后事务的提交流程,即 commitTransactionAfterReturning(txInfo);。
如下所示TransactionAspectSupport的事务提交动作交给了事务管理器来处理。
// org.springframework.transaction.interceptor.TransactionAspectSupport#commitTransactionAfterReturning protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) { //如果事务信息对象或者事务状态对象为null,则do nothing if (txInfo != null && txInfo.getTransactionStatus() != null) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]"); } txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } }
事务提交动作交给了AbstractPlatformTransactionManager的commit方法。
@Override public final void commit(TransactionStatus status) throws TransactionException { //如果事务已经完成,则抛出异常 if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; // 判断rollbackOnly,本文这里是false if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } //触发rollBack processRollback(defStatus, false); return; } //shouldCommitOnGlobalRollbackOnly默认为false ,后者则判断ConnectionHolder中的rollbackOnly属性 if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } processRollback(defStatus, true); return; } // 其实核心是这里,前面只是做了回滚机制校验 processCommit(defStatus); }
方法如上所示,首先校验事务完成状态,然后对回滚机制做了校验,最后才是核心方法processCommit(defStatus);
。
【1】 核心方法processCommit
AbstractPlatformTransactionManager的processCommit方法。
private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { boolean unexpectedRollback = false; // 默认是空方法 prepareForCommit(status); //遍历触发TransactionSynchronization的beforeCommit方法 triggerBeforeCommit(status); //遍历触发TransactionSynchronization的beforeCompletion方法 triggerBeforeCompletion(status); //修改标志为为true beforeCompletionInvoked = true; //如果有Savepoint,释放持有的savepoint if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } unexpectedRollback = status.isGlobalRollbackOnly(); status.releaseHeldSavepoint(); } //如果是一个新事务 else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } unexpectedRollback = status.isGlobalRollbackOnly(); //执行实际的提交,触发Connection的commit,这个执行完毕数据将会进入数据库 doCommit(status); } else if (isFailEarlyOnGlobalRollbackOnly()) { // 如果failEarlyOnGlobalRollbackOnly为true,默认为false unexpectedRollback = status.isGlobalRollbackOnly(); } // Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn't get a corresponding exception from commit. if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // can only be caused by doCommit //遍历触发TransactionSynchronization 的afterCompletion 状态是STATUS_ROLLED_BACK triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // can only be caused by doCommit //如果提交失败导致的回滚,则触发回滚 if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { //遍历触发TransactionSynchronization 的afterCompletion 状态是STATUS_UNKNOWN triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException | Error ex) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } //如果提交失败导致的回滚,则触发回滚 doRollbackOnCommitException(status, ex); throw ex; } // Trigger afterCommit callbacks, with an exception thrown there // propagated to callers but the transaction still considered as committed. try { //遍历触发TransactionSynchronization的afterCommit方法 triggerAfterCommit(status); } finally { //遍历触发TransactionSynchronization的afterCompletion方法,状态是STATUS_COMMITTED triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { //完成后清理事务信息 cleanupAfterCompletion(status); } }
方法简单来讲分为四个部分:
triggerXXXX触发TransactionSynchronization事务同步器的各种钩子函数,如果我们想做一些处理就可以实现这个接口或者继承自TransactionSynchronizationAdapter,像SqlSessionSynchronization一样。
savepoint的处理,如果持有savepoint则释放savepoint;
doCommit(status);执行实际的提交,这里会交给Connection去处理,执行完数据就进入了数据库;
cleanupAfterCompletion(status),处理完毕清理一些信息。
事务同步各种钩子我们另起篇章分析,我们这里看一下事务的提交和最后的清理。
【2】 事务真正提交doCommit
事务提交如下所示,其核心是将动作交给了数据库连接来处理commit。
//org.springframework.jdbc.datasource.DataSourceTransactionManager#doCommit @Override protected void doCommit(DefaultTransactionStatus status) { //获取到持有的事务对象 DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); //从事务对象获取持有的连接信息 Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Committing JDBC transaction on Connection [" + con + "]"); } try { //提交动作交给连接 con.commit(); } catch (SQLException ex) { throw new TransactionSystemException("Could not commit JDBC transaction", ex); } }
这里最终会下发下面这个命令执行事务的提交:
this.session.execSQL(null, "commit", -1, null, false, this.nullStatementResultSetFactory, null, false);
【3】 最后的清理cleanupAfterCompletion
实现将当前事务状态信息设置为已完成,然后判断是新事务则触发事务同步管理器的clear,会清空线程本地副本资源。如果是新事务还会重置并释放连接、重置ConnectionHolder属性为默认值。最后判断是否有挂起资源,如果有则恢复。
//org.springframework.transaction.support.AbstractPlatformTransactionManager#cleanupAfterCompletion private void cleanupAfterCompletion(DefaultTransactionStatus status) { //设置事务状态已完成 status.setCompleted(); //如果是新事务,触发事务同步管理器的clear,会清空线程本地副本资源 if (status.isNewSynchronization()) { TransactionSynchronizationManager.clear(); } //如果是新事务触发doCleanupAfterCompletion if (status.isNewTransaction()) { // 重置并释放连接,重置ConnectionHolder属性为默认值 doCleanupAfterCompletion(status.getTransaction()); } //如果挂起资源不为null,则恢复挂起的资源 if (status.getSuspendedResources() != null) { if (status.isDebug()) { logger.debug("Resuming suspended transaction after completion of inner transaction"); } Object transaction = (status.hasTransaction() ? status.getTransaction() : null); resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources()); } }
DataSourceTransactionManager的doCleanupAfterCompletion方法
//org.springframework.jdbc.datasource.DataSourceTransactionManager#doCleanupAfterCompletion @Override protected void doCleanupAfterCompletion(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; // Remove the connection holder from the thread, if exposed. if (txObject.isNewConnectionHolder()) { // 如果是新的connectionHolder则从 本地线程上下文对象resources移除当前持有的ConnectionHolder TransactionSynchronizationManager.unbindResource(obtainDataSource()); } // Reset connection. 重置连接,为释放连接做准备 Connection con = txObject.getConnectionHolder().getConnection(); try { if (txObject.isMustRestoreAutoCommit()) { con.setAutoCommit(true); } DataSourceUtils.resetConnectionAfterTransaction( con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly()); } catch (Throwable ex) { logger.debug("Could not reset JDBC Connection after transaction", ex); } if (txObject.isNewConnectionHolder()) { if (logger.isDebugEnabled()) { logger.debug("Releasing JDBC Connection [" + con + "] after transaction"); } //如果newConnectionHolder 为true,则释放连接 DataSourceUtils.releaseConnection(con, this.dataSource); } //清理ConnectionHolder,也就是恢复其属性为默认值 txObject.getConnectionHolder().clear(); }
ConnectionHolder的clear方法如下所示,其还触发了父类的clear方法。简单来讲,其就是恢复了成员的默认值。
//ConnectionHolder @Override public void clear() { super.clear(); this.transactionActive = false; this.savepointsSupported = null; this.savepointCounter = 0; } //ResourceHolderSupport public void clear() { this.synchronizedWithTransaction = false; this.rollbackOnly = false; this.deadline = null; }