Spring事务原理二(事务拦截逻辑)(上)

简介: Spring事务原理二(事务拦截逻辑)

事务的执行

当代理对象的方法被调用时,最终会调用到TransactionInterceptor的invoke()方法上面。对于为什么会调用到invoke()方法的小伙伴,需要取了解一下动态代理的原理,可阅读博客:java代理模式的这些细节,你知道多少?

@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
// 意思是最终会适配到TransactionAspectSupport.invokeWithinTransaction方法
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
复制代码

TransactionAspectSupport.invokeWithinTransaction()

@Nullable
  protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
      final InvocationCallback invocation) throws Throwable {
    // 获取AnnotationTransactionAttributeSource,这个是从外面设置进TransactionInterceptor的。
    // If the transaction attribute is null, the method is non-transactional.
    TransactionAttributeSource tas = getTransactionAttributeSource();
    // 获取Transactional注解参数
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    // 从容器中获取TransactionManager实例
    final TransactionManager tm = determineTransactionManager(txAttr);
    if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
      ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
        if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
          throw new TransactionUsageException(
              "Unsupported annotated transaction on suspending function detected: " + method +
              ". Use TransactionalOperator.transactional extensions instead.");
        }
        ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
        if (adapter == null) {
          throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
              method.getReturnType());
        }
        return new ReactiveTransactionSupport(adapter);
      });
      return txSupport.invokeWithinTransaction(
          method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
    }
    // TransactionManager转换成PlatformTransactionManager类型
    PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
    // 获取方法身份,类似com.github.AccountServiceImpl.save。
    // 因为txAttr是RuleBasedTransactionAttribute类型,所以最终返回ClassUtils.getQualifiedMethodName()返回【类名+"."+方法名】
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    // 如果是声明式事务
    if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
      // Standard transaction demarcation with getTransaction and commit/rollback calls.
      // 获取事务,这里面就要考虑spring的事务传播机制
      // *****这是重点*****这是重点*****这是重点*****这是重点*****这是重点*****这是重点*****
      TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
      Object retVal;
      try {
        // 执行目标方法
        // This is an around advice: Invoke the next interceptor in the chain.
        // This will normally result in a target object being invoked.
        retVal = invocation.proceedWithInvocation();
      }
      catch (Throwable ex) {
        // target invocation exception
        // 遇到异常的情况如何处理事务,要结合事务的传播机制来看。
        completeTransactionAfterThrowing(txInfo, ex);
        throw ex;
      }
      finally {
        // 清除当前事务的相关信息。把当前线程中TransactionInfo设置成oldTransactionInfo
        cleanupTransactionInfo(txInfo);
      }
      // 假如返回值是Try<Integer> result = Try.of(() -> 1 / 0)这样的类型,那么就要检测它是否会抛异常
      if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
        // Set rollback-only in case of Vavr failure matching our rollback rules...
        TransactionStatus status = txInfo.getTransactionStatus();
        if (status != null && txAttr != null) {
          // 真正地执行后获取结果。异常后设置status里面的属性值rollbackOnly=true
          retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
        }
      }
      // 最后就是准备提交事务。
      commitTransactionAfterReturning(txInfo);
      return retVal;
    }
    // 下面是编程式事务
    else {
      Object result;
      final ThrowableHolder throwableHolder = new ThrowableHolder();
      // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
      try {
        result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
          TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
          try {
            Object retVal = invocation.proceedWithInvocation();
            if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
              // Set rollback-only in case of Vavr failure matching our rollback rules...
              retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
            }
            return retVal;
          }
          catch (Throwable ex) {
            if (txAttr.rollbackOn(ex)) {
              // A RuntimeException: will lead to a rollback.
              if (ex instanceof RuntimeException) {
                throw (RuntimeException) ex;
              }
              else {
                throw new ThrowableHolderException(ex);
              }
            }
            else {
              // A normal return value: will lead to a commit.
              throwableHolder.throwable = ex;
              return null;
            }
          }
          finally {
            cleanupTransactionInfo(txInfo);
          }
        });
      }
      catch (ThrowableHolderException ex) {
        throw ex.getCause();
      }
      catch (TransactionSystemException ex2) {
        if (throwableHolder.throwable != null) {
          logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
          ex2.initApplicationException(throwableHolder.throwable);
        }
        throw ex2;
      }
      catch (Throwable ex2) {
        if (throwableHolder.throwable != null) {
          logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
        }
        throw ex2;
      }
      // Check result state: It might indicate a Throwable to rethrow.
      if (throwableHolder.throwable != null) {
        throw throwableHolder.throwable;
      }
      return result;
    }
  }
复制代码

总结下来,重点就下面几个方法

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
      final InvocationCallback invocation) throws Throwable {
  // 调用目标方法前,根据情况确定要不要创建新的事务
  TransactionInfo txInfo = createTransactionIfNecessary();
  Object retVal;
  try{
    // 调用目标方法
    retVal = invocation.proceedWithInvocation();
  }catch(Throwable ex){
    // 根据调用目标方法的实际情况确定是回滚还是提交,还是只设置回滚标记。里面最后一步还包括回收链接,恢复被挂起的事务等等
    completeTransactionAfterThrowing(txInfo, ex);
    throw ex;
  }finally{
    // 清除线程中当前事务信息
    cleanupTransactionInfo(txInfo);
  }
  // 根据情况判断是提交还是回滚。里面最后一步还包括回收链接,恢复被挂起的事务等等
  commitTransactionAfterReturning(txInfo);
  return retVal;
}
复制代码

createTransactionIfNecessary

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
      @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
  // 下面就是包装了一下
    // If no name specified, apply method identification as transaction name.
    if (txAttr != null && txAttr.getName() == null) {
      txAttr = new DelegatingTransactionAttribute(txAttr) {
        @Override
        public String getName() {
          return joinpointIdentification;
        }
      };
    }
  // *****这是重点,获取事务状态******
    TransactionStatus status = null;
    if (txAttr != null) {
      if (tm != null) {
        status = tm.getTransaction(txAttr);
      }
      else {
        if (logger.isDebugEnabled()) {
          logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
              "] because no transaction manager has been configured");
        }
      }
    }
  // 这里就是再包装一层返回TransactionInfo对象
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
  }
@Override
  public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
      throws TransactionException {
    // Use defaults if no transaction definition given.
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
    // 如果第一次调用的话,返回DataSourceTransactionObject实例,connectionHolder=null;
    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();
    // 如果有connectionHolder,并且事务是激活状态。说明这不是第一次进代理方法,那么就需要结合当前的传播机制来处理当前的事务
    if (isExistingTransaction(transaction)) {
      // Existing transaction found -> check propagation behavior to find out how to behave.
      // *******************这是重点,需要结合事务传播机制来看*******************
      return handleExistingTransaction(def, transaction, debugEnabled);
    }
    // 检测超时时间是否合法
    // Check definition settings for new transaction.
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
      throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }
    // 下面都是第一次调用。说明之前不存在任何事务。
    // No existing transaction found -> check propagation behavior to find out how to proceed.
    // 当传播机制是PROPAGATION_MANDATORY,表示当前必须存在一个事务,否则抛出异常
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
      throw new IllegalTransactionStateException(
          "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    // 当第一次调用时,如果传播机制是以下三种的话,那么都需要新建事务
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
             //PROPAGATION_NESTED是嵌套事务,这个事务能否成功提交取决于外部事务是否成功
             // 如果内部事务成功,外部事务失败,那么一起回滚
             // 如果内部事务成功,外部事务成功,那么一起提交
             // 如果内部事务失败,外部事务失败,那么一起回滚
             // 如果内部事务失败,外部事务成功,那么内部事务回滚,外部事务提交
        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
      // 挂起上一个事务,目前不存在上一个事务,那么直接传null
      SuspendedResourcesHolder suspendedResources = suspend(null);
      if (debugEnabled) {
        logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
      }
      try {
        // 开启新事务
        return startTransaction(def, transaction, debugEnabled, suspendedResources);
      }
      catch (RuntimeException | Error ex) {
        resume(null, suspendedResources);
        throw ex;
      }
    }
    // 剩下的传播机制,都创建空事务
    else {
      // Create "empty" transaction: no actual transaction, but potentially synchronization.
      if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
        logger.warn("Custom isolation level specified but no actual transaction initiated; " +
            "isolation level will effectively be ignored: " + def);
      }
      boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
      // 创建一个默认的TransactionStatus,里面的事务为空,同时更新当前线程的信息
      return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
  }
// 开启新事务
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
      boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
   boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  // 创建一个新事务, transaction里面的connectionHolder=null
  DefaultTransactionStatus status = newTransactionStatus(
         definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  // 开启新事务
   doBegin(transaction, definition);
  // 设置当前事务的事务同步管理器。这个事务同步管理器其实就是把针对当前事务的配置参数和当前线程绑在一起,通过TransactionSynchronizationManager管理
   prepareSynchronization(status, definition);
   return status;
}
@Override
  protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;
    try {
      // 因为是新的事务,所以需要创建新的链接Connection
      if (!txObject.hasConnectionHolder() ||
          txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
        // 获取新的Connection
        Connection newCon = obtainDataSource().getConnection();
        if (logger.isDebugEnabled()) {
          logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
        }
        // 把新的Connection设置进transaction中
        txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
      }
      txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
      con = txObject.getConnectionHolder().getConnection();
      // 设置readOnly和isolationLevel
      Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
      txObject.setPreviousIsolationLevel(previousIsolationLevel);
      txObject.setReadOnly(definition.isReadOnly());
      // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
      // so we don't want to do it unnecessarily (for example if we've explicitly
      // configured the connection pool to set it already).
      // 如果是自动提交,切换到手动提交
      if (con.getAutoCommit()) {
        txObject.setMustRestoreAutoCommit(true);
        if (logger.isDebugEnabled()) {
          logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
        }
        con.setAutoCommit(false);
      }
      // 如果是readOnly,通过Statement执行SET TRANSACTION READ ONLY
      prepareTransactionalConnection(con, definition);
      txObject.getConnectionHolder().setTransactionActive(true);
      int timeout = determineTimeout(definition);
      if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
        txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
      }
      // Bind the connection holder to the thread.
      if (txObject.isNewConnectionHolder()) {
        // 把新创建出来的ConnectionHolder绑定到当前线程中
        TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
      }
    }
    catch (Throwable ex) {
      // 出异常了就释放链接
      if (txObject.isNewConnectionHolder()) {
        DataSourceUtils.releaseConnection(con, obtainDataSource());
        txObject.setConnectionHolder(null, false);
      }
      throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
  }


相关文章
|
7天前
|
XML Java 开发者
Spring Boot开箱即用可插拔实现过程演练与原理剖析
【11月更文挑战第20天】Spring Boot是一个基于Spring框架的项目,其设计目的是简化Spring应用的初始搭建以及开发过程。Spring Boot通过提供约定优于配置的理念,减少了大量的XML配置和手动设置,使得开发者能够更专注于业务逻辑的实现。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,为开发者提供一个全面的理解。
19 0
|
15天前
|
Java 开发者 Spring
Spring高手之路24——事务类型及传播行为实战指南
本篇文章深入探讨了Spring中的事务管理,特别是事务传播行为(如REQUIRES_NEW和NESTED)的应用与区别。通过详实的示例和优化的时序图,全面解析如何在实际项目中使用这些高级事务控制技巧,以提升开发者的Spring事务管理能力。
31 1
Spring高手之路24——事务类型及传播行为实战指南
|
9天前
|
XML Java 数据库连接
Spring中的事务是如何实现的
Spring中的事务管理机制通过一系列强大的功能和灵活的配置选项,为开发者提供了高效且可靠的事务处理手段。无论是通过注解还是AOP配置,Spring都能轻松实现复杂的事务管理需求。掌握这些工具和最佳实践,能
15 3
|
1月前
|
存储 缓存 Java
Spring高手之路23——AOP触发机制与代理逻辑的执行
本篇文章深入解析了Spring AOP代理的触发机制和执行流程,从源码角度详细讲解了Bean如何被AOP代理,包括代理对象的创建、配置与执行逻辑,帮助读者全面掌握Spring AOP的核心技术。
38 3
Spring高手之路23——AOP触发机制与代理逻辑的执行
|
2月前
|
Java 数据库连接 数据库
spring复习05,spring整合mybatis,声明式事务
这篇文章详细介绍了如何在Spring框架中整合MyBatis以及如何配置声明式事务。主要内容包括:在Maven项目中添加依赖、创建实体类和Mapper接口、配置MyBatis核心配置文件和映射文件、配置数据源、创建sqlSessionFactory和sqlSessionTemplate、实现Mapper接口、配置声明式事务以及测试使用。此外,还解释了声明式事务的传播行为、隔离级别、只读提示和事务超时期间等概念。
spring复习05,spring整合mybatis,声明式事务
|
1月前
|
Java Spring 容器
Spring底层原理大致脉络
Spring底层原理大致脉络
|
1月前
|
Java Spring 容器
Spring IOC、AOP与事务管理底层原理及源码解析
【10月更文挑战第1天】Spring框架以其强大的控制反转(IOC)和面向切面编程(AOP)功能,成为Java企业级开发中的首选框架。本文将深入探讨Spring IOC和AOP的底层原理,并通过源码解析来揭示其实现机制。同时,我们还将探讨Spring事务管理的核心原理,并给出相应的源码示例。
128 9
|
1月前
|
Java 关系型数据库 MySQL
Spring事务失效,我总结了这7个主要原因
本文详细探讨了Spring事务在日常开发中常见的七个失效原因,包括数据库不支持事务、类不受Spring管理、事务方法非public、异常被捕获、`rollbackFor`属性配置错误、方法内部调用事务方法及事务传播属性使用不当。通过具体示例和源码分析,帮助开发者更好地理解和应用Spring事务机制,避免线上事故。适合所有使用Spring进行业务开发的工程师参考。
32 2
|
1月前
|
Java 程序员 Spring
Spring事务的1道面试题
每次聊起Spring事务,好像很熟悉,又好像很陌生。本篇通过一道面试题和一些实践,来拆解几个Spring事务的常见坑点。
Spring事务的1道面试题
|
2月前
|
Java Spring
Spring 事务传播机制是什么?
Spring 事务传播机制是什么?
23 4
下一篇
无影云桌面