前面已经创建过事务的情况:
private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { // 那么遇到不允许有事务的传播机制,就直接抛异常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } // PROPAGATION_NOT_SUPPORTED就是以非事务的方式执行,那么我们就需要先把当前的事务挂起来 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } // 挂起当前事务 // 1.把当前线程的connectionHolder解绑并返回connectionHolder // 2.取出当前事务同步器中的所有信息,并重置线程同步器信息 // 3.把connectionHolder和相关信息封装成一个SuspendedResourcesHolder对象返回 Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); // 创建一个DefaultTransactionStatus返回 return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } // 如果是PROPAGATION_REQUIRES_NEW,那么必须要创建一个新的事务 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } // 挂起当前事务 SuspendedResourcesHolder suspendedResources = suspend(transaction); try { // 开启新事务 return startTransaction(definition, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } } // 如果是PROPAGATION_NESTED,那么就是嵌套事务。就要使用回滚点 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } if (useSavepointForNestedTransaction()) { // Create savepoint within existing Spring-managed transaction, // through the SavepointManager API implemented by TransactionStatus. // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization. // 同样的还是创建一个DefaultTransactionStatus对象 DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); // 创建回滚点后返回 status.createAndHoldSavepoint(); return status; } else { // Nested transaction through nested begin and commit/rollback calls. // Usually only for JTA: Spring synchronization might get activated here // in case of a pre-existing JTA transaction. // 针对JTA事务,创建一个新的事务来处理 return startTransaction(definition, transaction, debugEnabled, null); } } // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED. if (debugEnabled) { logger.debug("Participating in existing transaction"); } if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)")); } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } } // 剩下的传播机制,还是当前事务 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); } 复制代码
总结一下:
1.createTransactionIfNecessary():当代理对象方法被调用时,获取TransactionAttribute、TransactionManager以及目标方法名joinpointIdentification,利用这三个参数来创建事务TransactionInfo
- 1.1.tm.getTransaction():获取事务
- 1.1.1:doGetTransaction():创建新的DataSourceTransactionObject,并从当前线程中查找connectionHolder,不管有没有找到,都设置进DataSourceTransactionObject对象中。
- 1.1.2:isExistingTransaction():判断是否已经存在事务,其实就是判断DataSourceTransactionObject中是否有connectionHolder,并且事务是否是激活的。
- 1.1.3:handleExistingTransaction():如果已经存在事务。-----------这是逻辑分支------------
- 1.1.3.1:如果存在事务,当传播机制是PROPAGATION_NEVER,那么直接抛异常。
- 1.1.3.2:如果存在事务,当传播机制是PROPAGATION_NOT_SUPPORTED,那么直接挂起当前事务。创建DefaultTransactionStatus对象,并且更新当前线程事务信息。
- 1.1.3.3:如果存在事务,当传播机制是PROPAGATION_REQUIRES_NEW,那么先挂起当前事务,并且创建新的连接设置进新事务中,并通过新创建的transaction封装一个DefaultTransactionStatus对象,同时更新当前线程中事务信息。
- 1.1.3.4:如果存在事务,当传播机制是PROPAGATION_NESTED,那么直接封装一个DefaultTransactionStatus对象,更新当前线程事务信息,并设置保存点后返回。
- 1.1.3.5:如果存在事务,当传播机制是PROPAGATION_SUPPORTS、PROPAGATION_MANDATORY、PROPAGATION_REQUIRED,那么直接创建一个DefaultTransactionStatus对象,更新当前线程的事务信息后返回。
- 1.1.4:如果不存在事务,传播机制是PROPAGATION_MANDATORY,就抛异常。该传播机制的要求是当前一定要存在事务才能继续执行,否则就抛异常。-----------这是逻辑分支------------
- 1.1.5:如果不存在事务,传播机制是PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED,那么意味着就需要自己创建事务。-----------这是逻辑分支------------
- 1.1.5.1:suspend(null):挂起空事务。就是把当前线程中的事务相关信息都取出来封装到一个SuspendedResourcesHolder对象中并返回。并且重置当前线程中事务相关信息,通过TransactionSynchronizationManager来设置。
- 1.1.5.2:startTransaction():开启新事务
- 1.1.5.2.1:doBegin():创建新的ConnectionHolder,并设置进transaction中,开启手动提交,最后绑定到当前线程中
- 1.1.5.2.2:prepareSynchronization():创建DefaultTransactionStatus对象,更新当前线程中的信息。因为此时已经创建了新的事务了。
- 1.1.6:如果不存在事务,传播机制是PROPAGATION_SUPPORTS、PROPAGATION_NOT_SUPPORTED、PROPAGATION_NEVER,那么就直接创建一个DefaultTransactionStatus对象返回。同时更新一下当前线程中的信息。-----------这是逻辑分支------------
- 1.2.prepareTransactionInfo():把TransactionAttribute、TransactionManager以及目标方法名joinpointIdentification打包成TransactionInfo对象,并且把TransactionStatus设置进去,最后把oldTransactionInfo解绑并取出来保存,把最新的TransactionInfo和当前线程绑定
protected static final class TransactionInfo { private void bindToThread() { // 从当前线程中取出来,然后把新的TransactionInfo和当前线程绑定 this.oldTransactionInfo = transactionInfoHolder.get(); transactionInfoHolder.set(this); } } 复制代码
- 其实总结一下上面的逻辑,createTransactionIfNecessary()最终创建了一个TransactionInfo对象,但是整个过程重点在于通过getTransaction()方法获取TransactionStatus对象,创建这个对象重点取决于创建Transaction,然后这个对象里面其实是包含了最关键的connectionHolder。
completeTransactionAfterThrowing:捕获执行目标代码后抛出的异常,理论上应该是在这里面做rollback
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) { if (txInfo != null && txInfo.getTransactionStatus() != null) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex); } if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) { try { // 如果这个抛出的异常正好匹配上Transaction注解上指定的异常,那么执行回滚操作 txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by rollback exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException | Error ex2) { logger.error("Application exception overridden by rollback exception", ex); throw ex2; } } else { // We don't roll back on this exception. // Will still roll back if TransactionStatus.isRollbackOnly() is true. try { // 如果异常匹配不上,那么还是依然执行提交。如果TransactionStatus里面的状态被标记为需要回滚的话,那么还是依然回滚,否则就提交 txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by commit exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException | Error ex2) { logger.error("Application exception overridden by commit exception", ex); throw ex2; } } } } private void processRollback(DefaultTransactionStatus status, boolean unexpected) { try { boolean unexpectedRollback = unexpected; try { triggerBeforeCompletion(status); // 检查是否有保存点,如果有保存点,就重置到保存点 if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Rolling back transaction to savepoint"); } // 重置到保存点 status.rollbackToHeldSavepoint(); } // 如果是新的事务,那么就直接回滚 else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction rollback"); } // 其实是拿出里面的ConnectionHolder来调用rollback doRollback(status); } else { // Participating in larger transaction // 如果不是新的事务,那么为了避免多次回滚,只需要将当前的rollback标记设置成true,这样的话,当上层代理catch这个抛出去的异常时,会检查rollback标记,最后才会回滚。 if (status.hasTransaction()) { if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { if (status.isDebug()) { logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); } doSetRollbackOnly(status); } else { if (status.isDebug()) { logger.debug("Participating transaction failed - letting transaction originator decide on rollback"); } } } else { logger.debug("Should roll back transaction but cannot - no transaction available"); } // Unexpected rollback only matters here if we're asked to fail early if (!isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = false; } } } catch (RuntimeException | Error ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw ex; } triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); // Raise UnexpectedRollbackException if we had a global rollback-only marker if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } } finally { // 解绑connectionHolder,重置connectionHolder,并且放回链接池。把挂起的事务恢复。 cleanupAfterCompletion(status); } } 复制代码
cleanupTransactionInfo(txInfo);这个方法其实只是把线程里面的TransactionInfo对象替换成了oldTransactionInfo对象,并没有修改当前的TransactionInfo对象。
commitTransactionAfterReturning
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) { if (txInfo != null && txInfo.getTransactionStatus() != null) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]"); } // 其实在这里面还是会判断TransactionStatus里面是否有回滚状态,如果有的话,依然会回滚,否则就提交 txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } } 复制代码
- 如果捕捉到异常,那么直接进入到completeTransactionAfterThrowing()方法中。从completeTransactionAfterThrowing()方法出来,把异常抛出去之前,还要执行finally中的cleanupTransactionInfo()方法
- completeTransactionAfterThrowing()方法:
- 1.判断当前异常是否匹配注解设置的异常,是的话,进入
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); 复制代码
- 1.1:判断是否有保存点,关联到传播机制为PROPAGATION_NESTED,如果有保存点,那么直接回滚到保存点
- 1.2:判断是否是新事务,关联到传播机制为PROPAGATION_REQUIRES_NEW,那么直接执行回滚操作
- 1.3:判断不是新事务,那么直接设置rollback标记,避免多次回滚。因为异常会一直往外抛,这个时候只需要设置回滚标记即可。
- 1.4:最后都会cleanupAfterCompletion():修改当前线程中的事务信息,恢复挂起的事务。解绑并重置connectionHolder,放回链接池。
- 2.如果没有匹配到这个异常的话,那么进入
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); 复制代码
- 2.1:判断事务是否被标记为回滚,那么说明内层事务之前已经设置了回滚标记,此时当前事务是新事务的话,就直接回滚。修改当前线程中的事务信息,恢复挂起的事务。解绑并重置connectionHolder,放回链接池。
- 2.2:如果当前事务被标记回滚,并且不是新事务的话,那么继续标记回滚,并且抛出异常。修改当前线程中的事务信息,恢复挂起的事务。解绑并重置connectionHolder,放回链接池。
- 2.3:如果不满足以上条件,那么直接提交。如果有保存点,那么直接释放保存点。如果是新事务,直接提交。如果提交失败,那么回滚。最后进行清理工作。修改当前线程中的事务信息,恢复挂起的事务。解绑并重置connectionHolder,放回链接池。