Spring事务管理
OK,了解到了代理模式的好处之后,就可以自然而然的想到Spring事务管理的基本原理了。无非是借助动态代理,做一些额外的操作:在真正的方法执行之前开启事务,在方法顺利执行完成之后提交事务,如果方法执行过程中发生异常,要执行回滚操作。
一般有两种方法使用Spring的事务管理,一种是利用注解,一种是手动编程。本文因为使用的是注解型的事务管理,因此只对注解型事务的使用和原理进行探究,以找到事务没有生效的原因。
▐ 注解型事务
- 注解型事务使用
注解型事务的使用非常简单,只需要在需要事务管理的类或方法上加上@Transactional(rollbackFor = Exception.class),其中rollbackFor指定的是遇到什么情况下执行回滚。
当然这个注解还有传播级别、隔离级别等我们背过的八股文属性,就不一一展开了。
- 事务执行原理
PlatformTransactionManager
我们知道,事务是针对数据库而言的。事实上,Spring并不直接管理事务,他只是提供了一套统一的框架和接口,具体由不同的、真正与数据库打交道的持久层框架来实现,如Hibernate、Ibatis、Mybatis等,这个统一的接口是PlatformTransactionManager,它提供了三个核心方法:获取事务(getTransaction)、提交事务(commit)、回滚事务(rollback),同时Spring提供了一个统一的抽象实现类AbstractPlatformTransactionManager,这是其他持久层框架事务实现类的共同父类。
public interface PlatformTransactionManager extends TransactionManager { /** * 获取事务 /* TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException; /** * 提交事务 /* void commit(TransactionStatus status) throws TransactionException; /** * 回滚事务 /* void rollback(TransactionStatus status) throws TransactionException; }
先来看getTransaction方法:
@Override public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { // 获取事务定义(该类保存了当前事务的相关属性,例如传播规则、隔离规则等) TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults()); // 获取真正的事务(doGetTransaction方法由具体的持久层框架实现) Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); // 如果已经存在事务,则根据传播规则执行相应的处理 if (isExistingTransaction(transaction)) { return handleExistingTransaction(def, transaction, debugEnabled); } // 检查当前事务的超时时间 if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout()); } // 如果当前不存在事务,检查传播级别和隔离级别,开始执行事务 startTransaction() if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); } try { return startTransaction(def, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + def); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null); } }
小结:getTransaction方法就是根据具体的传播行为返回一个当前存在的事务或者一个新的事务对象
再来看提交事务方法commit:
public final void commit(TransactionStatus status) throws TransactionException { // 如果当前事务已经完成,抛出异常 if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; // 如果当前事务设置了回滚标识,则进行回滚操作 if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } processRollback(defStatus, false); return; } // 如果全局事务设置了回滚标识,则进行回滚操作(也就是事务嵌套的场景) if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } processRollback(defStatus, true); return; } // 提交事务 processCommit(defStatus); }
小结:commit方法就是根据事务状态提交事务,如果事务被标记为“回滚”态,则进行回滚操作
最后来看rollback方法:
public final void rollback(TransactionStatus status) throws TransactionException { // 如果当前事务已经完成,抛出异常 if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; processRollback(defStatus, false); }
processRollback方法:
private void processRollback(DefaultTransactionStatus status, boolean unexpected) { try { boolean unexpectedRollback = unexpected; try { triggerBeforeCompletion(status); // 如果当前事务存在保存点,则回滚到保存点的状态 if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Rolling back transaction to savepoint"); } status.rollbackToHeldSavepoint(); } // 如果是新的事务,则执行doRollback方法 else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction rollback"); } doRollback(status); } else { // 如果有嵌套事务,则执行doSetRollbackOnly方法 if (status.hasTransaction()) { if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { if (status.isDebug()) { logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); } doSetRollbackOnly(status); } else { if (status.isDebug()) { logger.debug("Participating transaction failed - letting transaction originator decide on rollback"); } } } else { logger.debug("Should roll back transaction but cannot - no transaction available"); } // Unexpected rollback only matters here if we're asked to fail early if (!isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = false; } } } catch (RuntimeException | Error ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw ex; } triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); // Raise UnexpectedRollbackException if we had a global rollback-only marker // 发生了不期望的回滚异常 if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } } finally { cleanupAfterCompletion(status); } }
这里我们看到了一句熟悉的话"Transaction rolled back because it has been marked as rollback-only",相信一定有朋友在使用事务中看到过这个异常(不要犟),一般存在在这种场景下:A类的事务方法中调用了B类的事务方法,B方法发生了异常,但是在A类的try-catch模块中捕获了这个异常并正常执行。
这个异常发生的原因是:Spring默认的事务传播级别是propagation.REQUIRED,即如果有没有事务则新启一个事务,如果已经存在事务则加入这个事务。B方法发生了异常,给当前事务打上了rollbackOnly标记,但是被A捕获到了并正常执行,由于只有一个事务,等到A方法要提交这一个事务的时候,没有发现异常但是发现事务被打上了rollbackOnly,只能回滚并抛出一个unexpectedRollback异常。
- DynamicAdvisedInterceptor
现在,我们已经明确了事务的执行过程。那既然我们是使用注解来进行事务管理的,有了注解,就一定有注解的处理器,这时候就要搬出Spring的核心法宝——AOP。Spring就是通过切面来获取所有的Spring注解、解析注解并处理注解下的类、方法等。而AOP的核心是动态代理,我们找到AOP包下的Cglib动态代理类CglibAopProxy,其中有内部静态类DynamicAdvisedInterceptor
private static class DynamicAdvisedInterceptor implements MethodInterceptor, Serializable { private final AdvisedSupport advised; public DynamicAdvisedInterceptor(AdvisedSupport advised) { this.advised = advised; } @Override @Nullable public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable { Object oldProxy = null; boolean setProxyContext = false; Object target = null; TargetSource targetSource = this.advised.getTargetSource(); try { if (this.advised.exposeProxy) { // Make invocation available if necessary. oldProxy = AopContext.setCurrentProxy(proxy); setProxyContext = true; } // Get as late as possible to minimize the time we "own" the target, in case it comes from a pool... target = targetSource.getTarget(); Class<?> targetClass = (target != null ? target.getClass() : null); List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass); Object retVal; // Check whether we only have one InvokerInterceptor: that is, // no real advice, but just reflective invocation of the target. if (chain.isEmpty() && Modifier.isPublic(method.getModifiers())) { // We can skip creating a MethodInvocation: just invoke the target directly. // Note that the final invoker must be an InvokerInterceptor, so we know // it does nothing but a reflective operation on the target, and no hot // swapping or fancy proxying. Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args); retVal = methodProxy.invoke(target, argsToUse); } else { // We need to create a method invocation... retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed(); } retVal = processReturnType(proxy, target, method, retVal); return retVal; } finally { if (target != null && !targetSource.isStatic()) { targetSource.releaseTarget(target); } if (setProxyContext) { // Restore old proxy. AopContext.setCurrentProxy(oldProxy); } } }
其中有两个核心方法:
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass)
retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
第一个方法是获取拦截链,第二个方法是创建拦截链的Cglib代理对象来执行。对于SpringAOP来讲,凡是需要AOP增强的地方都会进入到这个方法,加载一系列的拦截器,按照责任链模式执行。所有拦截器都会实现MethodInterceptor接口,和invoke方法。
- TransactionInterceptor
我们顺藤摸瓜,找到了与事务相关的拦截器TransactionInterceptor,它实现了AOP包下的MethodInterceptor接口,和一个核心的调用方法invoke:
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable { @Nullable public Object invoke(final MethodInvocation invocation) throws Throwable { // 获取目标类 Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null; // 执行事务方法 return this.invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() { @Nullable public Object proceedWithInvocation() throws Throwable { return invocation.proceed(); } public Object getTarget() { return invocation.getThis(); } public Object[] getArguments() { return invocation.getArguments(); } }); } }
invokeWithinTransaction
进入invokeWithinTransaction方法:
@Nullable protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. TransactionAttributeSource tas = getTransactionAttributeSource(); // 获取事务属性类TransactionAttribute final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); // 根据事务属性选择合适的事务管理器(如PlatformTransactionManager) final TransactionManager tm = determineTransactionManager(txAttr); ……………………………………………………………………………… }
getTransactionAttribute
再看getTransactionAttribute方法:
public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) { if (method.getDeclaringClass() == Object.class) { return null; } // 从缓存中获取TransactionAttribute Object cacheKey = getCacheKey(method, targetClass); TransactionAttribute cached = this.attributeCache.get(cacheKey); if (cached != null) { // Value will either be canonical value indicating there is no transaction attribute, // or an actual transaction attribute. if (cached == NULL_TRANSACTION_ATTRIBUTE) { return null; } else { return cached; } } else { // 如果缓存中没有TransactionAttribute,则构建一个新的TransactionAttribute TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass); // Put it in the cache. if (txAttr == null) { this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE); } ……………………………………………………………………………… }
computeTransactionAttribute
再进入到computeTransactionAttribute方法中:
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) { // 校验当前注解的方法是否是public的(也就是@Transactional注解的方法必须是public的) if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) { return null; } // 获取真实目标对象的方法(获取代理对象真正代理的目标对象的执行方法) Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass); // 从目标对象的方法中获取事务属性(其实就是在找注解,先找方法上有没有注解) TransactionAttribute txAttr = findTransactionAttribute(specificMethod); if (txAttr != null) { return txAttr; } // 从目标对象的类中获取事务属性(再找类上有没有注解) txAttr = findTransactionAttribute(specificMethod.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } if (specificMethod != method) { // Fallback is to look at the original method. txAttr = findTransactionAttribute(method); if (txAttr != null) { return txAttr; } // Last fallback is the class of the original method. txAttr = findTransactionAttribute(method.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } } return null; }
invokeWithinTransaction
拿到事务管理器后,再回到invokeWithinTransaction方法,我们看到一些核心代码:
PlatformTransactionManager ptm = asPlatformTransactionManager(tm); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); Object retVal; try { // 动态代理调用目标对象本身的数据库操作 // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // 如果发生异常要进行回滚(内部是获取相应的TransactionManager,执行rollback方法) // target invocation exception completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { // 释放事务资源 cleanupTransactionInfo(txInfo); } if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... TransactionStatus status = txInfo.getTransactionStatus(); if (status != null && txAttr != null) { retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } } // 提交事务(内部是获取相应的TransactionManager,执行commit方法) commitTransactionAfterReturning(txInfo); return retVal; }
总结
Spring通过AOP,在执行具有@Transactional注解的方法时,会根据目标类生成一个代理对象,在目标方法执行之前获取事务并开启事务,在目标方法执行之后提交事务或者回滚事务。当执行一个类中的普通方法时,Spring没有生成代理对象,即使这个方法内部调用了同类其他的事务方法,也就无法参与事务管理了。
团队介绍
天猫优品是天猫面向县域下沉市场的新零售品牌,依托集团大中台,围绕品牌-天猫优品平台-门店-消费者四个商业角色,构建了SBbc全链路新零售解决方案。自2020年6月回归天猫行业后,支撑了天猫优品直营业务的持续增长,同时沉淀了消电行业门店新零售数字化解决方案。