Spring事务原理二(事务拦截逻辑)(上)

简介: Spring事务原理二(事务拦截逻辑)

事务的执行

当代理对象的方法被调用时,最终会调用到TransactionInterceptor的invoke()方法上面。对于为什么会调用到invoke()方法的小伙伴,需要取了解一下动态代理的原理,可阅读博客:java代理模式的这些细节,你知道多少?

@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
// 意思是最终会适配到TransactionAspectSupport.invokeWithinTransaction方法
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
复制代码

TransactionAspectSupport.invokeWithinTransaction()

@Nullable
  protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
      final InvocationCallback invocation) throws Throwable {
    // 获取AnnotationTransactionAttributeSource,这个是从外面设置进TransactionInterceptor的。
    // If the transaction attribute is null, the method is non-transactional.
    TransactionAttributeSource tas = getTransactionAttributeSource();
    // 获取Transactional注解参数
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    // 从容器中获取TransactionManager实例
    final TransactionManager tm = determineTransactionManager(txAttr);
    if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
      ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
        if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
          throw new TransactionUsageException(
              "Unsupported annotated transaction on suspending function detected: " + method +
              ". Use TransactionalOperator.transactional extensions instead.");
        }
        ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
        if (adapter == null) {
          throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
              method.getReturnType());
        }
        return new ReactiveTransactionSupport(adapter);
      });
      return txSupport.invokeWithinTransaction(
          method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
    }
    // TransactionManager转换成PlatformTransactionManager类型
    PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
    // 获取方法身份,类似com.github.AccountServiceImpl.save。
    // 因为txAttr是RuleBasedTransactionAttribute类型,所以最终返回ClassUtils.getQualifiedMethodName()返回【类名+"."+方法名】
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    // 如果是声明式事务
    if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
      // Standard transaction demarcation with getTransaction and commit/rollback calls.
      // 获取事务,这里面就要考虑spring的事务传播机制
      // *****这是重点*****这是重点*****这是重点*****这是重点*****这是重点*****这是重点*****
      TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
      Object retVal;
      try {
        // 执行目标方法
        // This is an around advice: Invoke the next interceptor in the chain.
        // This will normally result in a target object being invoked.
        retVal = invocation.proceedWithInvocation();
      }
      catch (Throwable ex) {
        // target invocation exception
        // 遇到异常的情况如何处理事务,要结合事务的传播机制来看。
        completeTransactionAfterThrowing(txInfo, ex);
        throw ex;
      }
      finally {
        // 清除当前事务的相关信息。把当前线程中TransactionInfo设置成oldTransactionInfo
        cleanupTransactionInfo(txInfo);
      }
      // 假如返回值是Try<Integer> result = Try.of(() -> 1 / 0)这样的类型,那么就要检测它是否会抛异常
      if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
        // Set rollback-only in case of Vavr failure matching our rollback rules...
        TransactionStatus status = txInfo.getTransactionStatus();
        if (status != null && txAttr != null) {
          // 真正地执行后获取结果。异常后设置status里面的属性值rollbackOnly=true
          retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
        }
      }
      // 最后就是准备提交事务。
      commitTransactionAfterReturning(txInfo);
      return retVal;
    }
    // 下面是编程式事务
    else {
      Object result;
      final ThrowableHolder throwableHolder = new ThrowableHolder();
      // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
      try {
        result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
          TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
          try {
            Object retVal = invocation.proceedWithInvocation();
            if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
              // Set rollback-only in case of Vavr failure matching our rollback rules...
              retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
            }
            return retVal;
          }
          catch (Throwable ex) {
            if (txAttr.rollbackOn(ex)) {
              // A RuntimeException: will lead to a rollback.
              if (ex instanceof RuntimeException) {
                throw (RuntimeException) ex;
              }
              else {
                throw new ThrowableHolderException(ex);
              }
            }
            else {
              // A normal return value: will lead to a commit.
              throwableHolder.throwable = ex;
              return null;
            }
          }
          finally {
            cleanupTransactionInfo(txInfo);
          }
        });
      }
      catch (ThrowableHolderException ex) {
        throw ex.getCause();
      }
      catch (TransactionSystemException ex2) {
        if (throwableHolder.throwable != null) {
          logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
          ex2.initApplicationException(throwableHolder.throwable);
        }
        throw ex2;
      }
      catch (Throwable ex2) {
        if (throwableHolder.throwable != null) {
          logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
        }
        throw ex2;
      }
      // Check result state: It might indicate a Throwable to rethrow.
      if (throwableHolder.throwable != null) {
        throw throwableHolder.throwable;
      }
      return result;
    }
  }
复制代码

总结下来,重点就下面几个方法

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
      final InvocationCallback invocation) throws Throwable {
  // 调用目标方法前,根据情况确定要不要创建新的事务
  TransactionInfo txInfo = createTransactionIfNecessary();
  Object retVal;
  try{
    // 调用目标方法
    retVal = invocation.proceedWithInvocation();
  }catch(Throwable ex){
    // 根据调用目标方法的实际情况确定是回滚还是提交,还是只设置回滚标记。里面最后一步还包括回收链接,恢复被挂起的事务等等
    completeTransactionAfterThrowing(txInfo, ex);
    throw ex;
  }finally{
    // 清除线程中当前事务信息
    cleanupTransactionInfo(txInfo);
  }
  // 根据情况判断是提交还是回滚。里面最后一步还包括回收链接,恢复被挂起的事务等等
  commitTransactionAfterReturning(txInfo);
  return retVal;
}
复制代码

createTransactionIfNecessary

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
      @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
  // 下面就是包装了一下
    // If no name specified, apply method identification as transaction name.
    if (txAttr != null && txAttr.getName() == null) {
      txAttr = new DelegatingTransactionAttribute(txAttr) {
        @Override
        public String getName() {
          return joinpointIdentification;
        }
      };
    }
  // *****这是重点,获取事务状态******
    TransactionStatus status = null;
    if (txAttr != null) {
      if (tm != null) {
        status = tm.getTransaction(txAttr);
      }
      else {
        if (logger.isDebugEnabled()) {
          logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
              "] because no transaction manager has been configured");
        }
      }
    }
  // 这里就是再包装一层返回TransactionInfo对象
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
  }
@Override
  public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
      throws TransactionException {
    // Use defaults if no transaction definition given.
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
    // 如果第一次调用的话,返回DataSourceTransactionObject实例,connectionHolder=null;
    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();
    // 如果有connectionHolder,并且事务是激活状态。说明这不是第一次进代理方法,那么就需要结合当前的传播机制来处理当前的事务
    if (isExistingTransaction(transaction)) {
      // Existing transaction found -> check propagation behavior to find out how to behave.
      // *******************这是重点,需要结合事务传播机制来看*******************
      return handleExistingTransaction(def, transaction, debugEnabled);
    }
    // 检测超时时间是否合法
    // Check definition settings for new transaction.
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
      throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }
    // 下面都是第一次调用。说明之前不存在任何事务。
    // No existing transaction found -> check propagation behavior to find out how to proceed.
    // 当传播机制是PROPAGATION_MANDATORY,表示当前必须存在一个事务,否则抛出异常
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
      throw new IllegalTransactionStateException(
          "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    // 当第一次调用时,如果传播机制是以下三种的话,那么都需要新建事务
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
             //PROPAGATION_NESTED是嵌套事务,这个事务能否成功提交取决于外部事务是否成功
             // 如果内部事务成功,外部事务失败,那么一起回滚
             // 如果内部事务成功,外部事务成功,那么一起提交
             // 如果内部事务失败,外部事务失败,那么一起回滚
             // 如果内部事务失败,外部事务成功,那么内部事务回滚,外部事务提交
        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
      // 挂起上一个事务,目前不存在上一个事务,那么直接传null
      SuspendedResourcesHolder suspendedResources = suspend(null);
      if (debugEnabled) {
        logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
      }
      try {
        // 开启新事务
        return startTransaction(def, transaction, debugEnabled, suspendedResources);
      }
      catch (RuntimeException | Error ex) {
        resume(null, suspendedResources);
        throw ex;
      }
    }
    // 剩下的传播机制,都创建空事务
    else {
      // Create "empty" transaction: no actual transaction, but potentially synchronization.
      if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
        logger.warn("Custom isolation level specified but no actual transaction initiated; " +
            "isolation level will effectively be ignored: " + def);
      }
      boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
      // 创建一个默认的TransactionStatus,里面的事务为空,同时更新当前线程的信息
      return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
  }
// 开启新事务
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
      boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
   boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  // 创建一个新事务, transaction里面的connectionHolder=null
  DefaultTransactionStatus status = newTransactionStatus(
         definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  // 开启新事务
   doBegin(transaction, definition);
  // 设置当前事务的事务同步管理器。这个事务同步管理器其实就是把针对当前事务的配置参数和当前线程绑在一起,通过TransactionSynchronizationManager管理
   prepareSynchronization(status, definition);
   return status;
}
@Override
  protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;
    try {
      // 因为是新的事务,所以需要创建新的链接Connection
      if (!txObject.hasConnectionHolder() ||
          txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
        // 获取新的Connection
        Connection newCon = obtainDataSource().getConnection();
        if (logger.isDebugEnabled()) {
          logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
        }
        // 把新的Connection设置进transaction中
        txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
      }
      txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
      con = txObject.getConnectionHolder().getConnection();
      // 设置readOnly和isolationLevel
      Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
      txObject.setPreviousIsolationLevel(previousIsolationLevel);
      txObject.setReadOnly(definition.isReadOnly());
      // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
      // so we don't want to do it unnecessarily (for example if we've explicitly
      // configured the connection pool to set it already).
      // 如果是自动提交,切换到手动提交
      if (con.getAutoCommit()) {
        txObject.setMustRestoreAutoCommit(true);
        if (logger.isDebugEnabled()) {
          logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
        }
        con.setAutoCommit(false);
      }
      // 如果是readOnly,通过Statement执行SET TRANSACTION READ ONLY
      prepareTransactionalConnection(con, definition);
      txObject.getConnectionHolder().setTransactionActive(true);
      int timeout = determineTimeout(definition);
      if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
        txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
      }
      // Bind the connection holder to the thread.
      if (txObject.isNewConnectionHolder()) {
        // 把新创建出来的ConnectionHolder绑定到当前线程中
        TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
      }
    }
    catch (Throwable ex) {
      // 出异常了就释放链接
      if (txObject.isNewConnectionHolder()) {
        DataSourceUtils.releaseConnection(con, obtainDataSource());
        txObject.setConnectionHolder(null, false);
      }
      throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
  }


相关文章
|
1月前
|
XML Java 开发者
Spring Boot中的bean注入方式和原理
Spring Boot中的bean注入方式和原理
53 0
|
1月前
|
监控 Java 数据处理
【Spring云原生】Spring Batch:海量数据高并发任务处理!数据处理纵享新丝滑!事务管理机制+并行处理+实例应用讲解
【Spring云原生】Spring Batch:海量数据高并发任务处理!数据处理纵享新丝滑!事务管理机制+并行处理+实例应用讲解
|
1月前
|
缓存 Java API
【云原生】Spring Cloud Gateway的底层原理与实践方法探究
【云原生】Spring Cloud Gateway的底层原理与实践方法探究
|
26天前
|
安全 Java 数据安全/隐私保护
【深入浅出Spring原理及实战】「EL表达式开发系列」深入解析SpringEL表达式理论详解与实际应用
【深入浅出Spring原理及实战】「EL表达式开发系列」深入解析SpringEL表达式理论详解与实际应用
59 1
|
26天前
|
存储 XML 缓存
【深入浅出Spring原理及实战】「缓存Cache开发系列」带你深入分析Spring所提供的缓存Cache功能的开发实战指南(一)
【深入浅出Spring原理及实战】「缓存Cache开发系列」带你深入分析Spring所提供的缓存Cache功能的开发实战指南
60 0
|
27天前
|
XML 缓存 Java
天天用 Spring,bean 实例化原理你懂吗
天天用 Spring,bean 实例化原理你懂吗
17 0
|
1月前
|
前端开发 Java 数据管理
Spring MVC 工作原理解析
Spring MVC 工作原理解析
22 0
|
1月前
|
XML Java 数据库
【二十四】springboot整合spring事务详解以及实战
【二十四】springboot整合spring事务详解以及实战
104 0
|
1月前
|
消息中间件 存储 Cloud Native
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
|
1月前
|
Java 大数据 数据库
【Spring底层原理高级进阶】Spring Batch清洗和转换数据,一键处理繁杂数据!Spring Batch是如何实现IO流优化的?本文详解!
【Spring底层原理高级进阶】Spring Batch清洗和转换数据,一键处理繁杂数据!Spring Batch是如何实现IO流优化的?本文详解!