事务的执行
当代理对象的方法被调用时,最终会调用到TransactionInterceptor的invoke()方法上面。对于为什么会调用到invoke()方法的小伙伴,需要取了解一下动态代理的原理,可阅读博客:java代理模式的这些细节,你知道多少?
@Override @Nullable public Object invoke(MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... // 意思是最终会适配到TransactionAspectSupport.invokeWithinTransaction方法 return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); } 复制代码
TransactionAspectSupport.invokeWithinTransaction()
@Nullable protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // 获取AnnotationTransactionAttributeSource,这个是从外面设置进TransactionInterceptor的。 // If the transaction attribute is null, the method is non-transactional. TransactionAttributeSource tas = getTransactionAttributeSource(); // 获取Transactional注解参数 final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); // 从容器中获取TransactionManager实例 final TransactionManager tm = determineTransactionManager(txAttr); if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) { ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> { if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) { throw new TransactionUsageException( "Unsupported annotated transaction on suspending function detected: " + method + ". Use TransactionalOperator.transactional extensions instead."); } ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType()); if (adapter == null) { throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " + method.getReturnType()); } return new ReactiveTransactionSupport(adapter); }); return txSupport.invokeWithinTransaction( method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm); } // TransactionManager转换成PlatformTransactionManager类型 PlatformTransactionManager ptm = asPlatformTransactionManager(tm); // 获取方法身份,类似com.github.AccountServiceImpl.save。 // 因为txAttr是RuleBasedTransactionAttribute类型,所以最终返回ClassUtils.getQualifiedMethodName()返回【类名+"."+方法名】 final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); // 如果是声明式事务 if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. // 获取事务,这里面就要考虑spring的事务传播机制 // *****这是重点*****这是重点*****这是重点*****这是重点*****这是重点*****这是重点***** TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); Object retVal; try { // 执行目标方法 // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception // 遇到异常的情况如何处理事务,要结合事务的传播机制来看。 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { // 清除当前事务的相关信息。把当前线程中TransactionInfo设置成oldTransactionInfo cleanupTransactionInfo(txInfo); } // 假如返回值是Try<Integer> result = Try.of(() -> 1 / 0)这样的类型,那么就要检测它是否会抛异常 if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... TransactionStatus status = txInfo.getTransactionStatus(); if (status != null && txAttr != null) { // 真正地执行后获取结果。异常后设置status里面的属性值rollbackOnly=true retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } } // 最后就是准备提交事务。 commitTransactionAfterReturning(txInfo); return retVal; } // 下面是编程式事务 else { Object result; final ThrowableHolder throwableHolder = new ThrowableHolder(); // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. try { result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> { TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status); try { Object retVal = invocation.proceedWithInvocation(); if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } return retVal; } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. throwableHolder.throwable = ex; return null; } } finally { cleanupTransactionInfo(txInfo); } }); } catch (ThrowableHolderException ex) { throw ex.getCause(); } catch (TransactionSystemException ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); ex2.initApplicationException(throwableHolder.throwable); } throw ex2; } catch (Throwable ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); } throw ex2; } // Check result state: It might indicate a Throwable to rethrow. if (throwableHolder.throwable != null) { throw throwableHolder.throwable; } return result; } } 复制代码
总结下来,重点就下面几个方法
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // 调用目标方法前,根据情况确定要不要创建新的事务 TransactionInfo txInfo = createTransactionIfNecessary(); Object retVal; try{ // 调用目标方法 retVal = invocation.proceedWithInvocation(); }catch(Throwable ex){ // 根据调用目标方法的实际情况确定是回滚还是提交,还是只设置回滚标记。里面最后一步还包括回收链接,恢复被挂起的事务等等 completeTransactionAfterThrowing(txInfo, ex); throw ex; }finally{ // 清除线程中当前事务信息 cleanupTransactionInfo(txInfo); } // 根据情况判断是提交还是回滚。里面最后一步还包括回收链接,恢复被挂起的事务等等 commitTransactionAfterReturning(txInfo); return retVal; } 复制代码
createTransactionIfNecessary
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { // 下面就是包装了一下 // If no name specified, apply method identification as transaction name. if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } // *****这是重点,获取事务状态****** TransactionStatus status = null; if (txAttr != null) { if (tm != null) { status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } // 这里就是再包装一层返回TransactionInfo对象 return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); } @Override public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { // Use defaults if no transaction definition given. TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults()); // 如果第一次调用的话,返回DataSourceTransactionObject实例,connectionHolder=null; Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); // 如果有connectionHolder,并且事务是激活状态。说明这不是第一次进代理方法,那么就需要结合当前的传播机制来处理当前的事务 if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. // *******************这是重点,需要结合事务传播机制来看******************* return handleExistingTransaction(def, transaction, debugEnabled); } // 检测超时时间是否合法 // Check definition settings for new transaction. if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout()); } // 下面都是第一次调用。说明之前不存在任何事务。 // No existing transaction found -> check propagation behavior to find out how to proceed. // 当传播机制是PROPAGATION_MANDATORY,表示当前必须存在一个事务,否则抛出异常 if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } // 当第一次调用时,如果传播机制是以下三种的话,那么都需要新建事务 else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || //PROPAGATION_NESTED是嵌套事务,这个事务能否成功提交取决于外部事务是否成功 // 如果内部事务成功,外部事务失败,那么一起回滚 // 如果内部事务成功,外部事务成功,那么一起提交 // 如果内部事务失败,外部事务失败,那么一起回滚 // 如果内部事务失败,外部事务成功,那么内部事务回滚,外部事务提交 def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 挂起上一个事务,目前不存在上一个事务,那么直接传null SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); } try { // 开启新事务 return startTransaction(def, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } // 剩下的传播机制,都创建空事务 else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + def); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); // 创建一个默认的TransactionStatus,里面的事务为空,同时更新当前线程的信息 return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null); } } // 开启新事务 private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); // 创建一个新事务, transaction里面的connectionHolder=null DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 开启新事务 doBegin(transaction, definition); // 设置当前事务的事务同步管理器。这个事务同步管理器其实就是把针对当前事务的配置参数和当前线程绑在一起,通过TransactionSynchronizationManager管理 prepareSynchronization(status, definition); return status; } @Override protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { // 因为是新的事务,所以需要创建新的链接Connection if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { // 获取新的Connection Connection newCon = obtainDataSource().getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } // 把新的Connection设置进transaction中 txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); // 设置readOnly和isolationLevel Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); txObject.setReadOnly(definition.isReadOnly()); // Switch to manual commit if necessary. This is very expensive in some JDBC drivers, // so we don't want to do it unnecessarily (for example if we've explicitly // configured the connection pool to set it already). // 如果是自动提交,切换到手动提交 if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } con.setAutoCommit(false); } // 如果是readOnly,通过Statement执行SET TRANSACTION READ ONLY prepareTransactionalConnection(con, definition); txObject.getConnectionHolder().setTransactionActive(true); int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // Bind the connection holder to the thread. if (txObject.isNewConnectionHolder()) { // 把新创建出来的ConnectionHolder绑定到当前线程中 TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { // 出异常了就释放链接 if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, obtainDataSource()); txObject.setConnectionHolder(null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }