Spring事务原理二(事务拦截逻辑)(上)

简介: Spring事务原理二(事务拦截逻辑)

事务的执行

当代理对象的方法被调用时,最终会调用到TransactionInterceptor的invoke()方法上面。对于为什么会调用到invoke()方法的小伙伴,需要取了解一下动态代理的原理,可阅读博客:java代理模式的这些细节,你知道多少?

@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
// 意思是最终会适配到TransactionAspectSupport.invokeWithinTransaction方法
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
复制代码

TransactionAspectSupport.invokeWithinTransaction()

@Nullable
  protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
      final InvocationCallback invocation) throws Throwable {
    // 获取AnnotationTransactionAttributeSource,这个是从外面设置进TransactionInterceptor的。
    // If the transaction attribute is null, the method is non-transactional.
    TransactionAttributeSource tas = getTransactionAttributeSource();
    // 获取Transactional注解参数
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    // 从容器中获取TransactionManager实例
    final TransactionManager tm = determineTransactionManager(txAttr);
    if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
      ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
        if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
          throw new TransactionUsageException(
              "Unsupported annotated transaction on suspending function detected: " + method +
              ". Use TransactionalOperator.transactional extensions instead.");
        }
        ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
        if (adapter == null) {
          throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
              method.getReturnType());
        }
        return new ReactiveTransactionSupport(adapter);
      });
      return txSupport.invokeWithinTransaction(
          method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
    }
    // TransactionManager转换成PlatformTransactionManager类型
    PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
    // 获取方法身份,类似com.github.AccountServiceImpl.save。
    // 因为txAttr是RuleBasedTransactionAttribute类型,所以最终返回ClassUtils.getQualifiedMethodName()返回【类名+"."+方法名】
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    // 如果是声明式事务
    if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
      // Standard transaction demarcation with getTransaction and commit/rollback calls.
      // 获取事务,这里面就要考虑spring的事务传播机制
      // *****这是重点*****这是重点*****这是重点*****这是重点*****这是重点*****这是重点*****
      TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
      Object retVal;
      try {
        // 执行目标方法
        // This is an around advice: Invoke the next interceptor in the chain.
        // This will normally result in a target object being invoked.
        retVal = invocation.proceedWithInvocation();
      }
      catch (Throwable ex) {
        // target invocation exception
        // 遇到异常的情况如何处理事务,要结合事务的传播机制来看。
        completeTransactionAfterThrowing(txInfo, ex);
        throw ex;
      }
      finally {
        // 清除当前事务的相关信息。把当前线程中TransactionInfo设置成oldTransactionInfo
        cleanupTransactionInfo(txInfo);
      }
      // 假如返回值是Try<Integer> result = Try.of(() -> 1 / 0)这样的类型,那么就要检测它是否会抛异常
      if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
        // Set rollback-only in case of Vavr failure matching our rollback rules...
        TransactionStatus status = txInfo.getTransactionStatus();
        if (status != null && txAttr != null) {
          // 真正地执行后获取结果。异常后设置status里面的属性值rollbackOnly=true
          retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
        }
      }
      // 最后就是准备提交事务。
      commitTransactionAfterReturning(txInfo);
      return retVal;
    }
    // 下面是编程式事务
    else {
      Object result;
      final ThrowableHolder throwableHolder = new ThrowableHolder();
      // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
      try {
        result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
          TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
          try {
            Object retVal = invocation.proceedWithInvocation();
            if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
              // Set rollback-only in case of Vavr failure matching our rollback rules...
              retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
            }
            return retVal;
          }
          catch (Throwable ex) {
            if (txAttr.rollbackOn(ex)) {
              // A RuntimeException: will lead to a rollback.
              if (ex instanceof RuntimeException) {
                throw (RuntimeException) ex;
              }
              else {
                throw new ThrowableHolderException(ex);
              }
            }
            else {
              // A normal return value: will lead to a commit.
              throwableHolder.throwable = ex;
              return null;
            }
          }
          finally {
            cleanupTransactionInfo(txInfo);
          }
        });
      }
      catch (ThrowableHolderException ex) {
        throw ex.getCause();
      }
      catch (TransactionSystemException ex2) {
        if (throwableHolder.throwable != null) {
          logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
          ex2.initApplicationException(throwableHolder.throwable);
        }
        throw ex2;
      }
      catch (Throwable ex2) {
        if (throwableHolder.throwable != null) {
          logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
        }
        throw ex2;
      }
      // Check result state: It might indicate a Throwable to rethrow.
      if (throwableHolder.throwable != null) {
        throw throwableHolder.throwable;
      }
      return result;
    }
  }
复制代码

总结下来,重点就下面几个方法

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
      final InvocationCallback invocation) throws Throwable {
  // 调用目标方法前,根据情况确定要不要创建新的事务
  TransactionInfo txInfo = createTransactionIfNecessary();
  Object retVal;
  try{
    // 调用目标方法
    retVal = invocation.proceedWithInvocation();
  }catch(Throwable ex){
    // 根据调用目标方法的实际情况确定是回滚还是提交,还是只设置回滚标记。里面最后一步还包括回收链接,恢复被挂起的事务等等
    completeTransactionAfterThrowing(txInfo, ex);
    throw ex;
  }finally{
    // 清除线程中当前事务信息
    cleanupTransactionInfo(txInfo);
  }
  // 根据情况判断是提交还是回滚。里面最后一步还包括回收链接,恢复被挂起的事务等等
  commitTransactionAfterReturning(txInfo);
  return retVal;
}
复制代码

createTransactionIfNecessary

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
      @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
  // 下面就是包装了一下
    // If no name specified, apply method identification as transaction name.
    if (txAttr != null && txAttr.getName() == null) {
      txAttr = new DelegatingTransactionAttribute(txAttr) {
        @Override
        public String getName() {
          return joinpointIdentification;
        }
      };
    }
  // *****这是重点,获取事务状态******
    TransactionStatus status = null;
    if (txAttr != null) {
      if (tm != null) {
        status = tm.getTransaction(txAttr);
      }
      else {
        if (logger.isDebugEnabled()) {
          logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
              "] because no transaction manager has been configured");
        }
      }
    }
  // 这里就是再包装一层返回TransactionInfo对象
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
  }
@Override
  public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
      throws TransactionException {
    // Use defaults if no transaction definition given.
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
    // 如果第一次调用的话,返回DataSourceTransactionObject实例,connectionHolder=null;
    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();
    // 如果有connectionHolder,并且事务是激活状态。说明这不是第一次进代理方法,那么就需要结合当前的传播机制来处理当前的事务
    if (isExistingTransaction(transaction)) {
      // Existing transaction found -> check propagation behavior to find out how to behave.
      // *******************这是重点,需要结合事务传播机制来看*******************
      return handleExistingTransaction(def, transaction, debugEnabled);
    }
    // 检测超时时间是否合法
    // Check definition settings for new transaction.
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
      throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }
    // 下面都是第一次调用。说明之前不存在任何事务。
    // No existing transaction found -> check propagation behavior to find out how to proceed.
    // 当传播机制是PROPAGATION_MANDATORY,表示当前必须存在一个事务,否则抛出异常
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
      throw new IllegalTransactionStateException(
          "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    // 当第一次调用时,如果传播机制是以下三种的话,那么都需要新建事务
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
             //PROPAGATION_NESTED是嵌套事务,这个事务能否成功提交取决于外部事务是否成功
             // 如果内部事务成功,外部事务失败,那么一起回滚
             // 如果内部事务成功,外部事务成功,那么一起提交
             // 如果内部事务失败,外部事务失败,那么一起回滚
             // 如果内部事务失败,外部事务成功,那么内部事务回滚,外部事务提交
        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
      // 挂起上一个事务,目前不存在上一个事务,那么直接传null
      SuspendedResourcesHolder suspendedResources = suspend(null);
      if (debugEnabled) {
        logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
      }
      try {
        // 开启新事务
        return startTransaction(def, transaction, debugEnabled, suspendedResources);
      }
      catch (RuntimeException | Error ex) {
        resume(null, suspendedResources);
        throw ex;
      }
    }
    // 剩下的传播机制,都创建空事务
    else {
      // Create "empty" transaction: no actual transaction, but potentially synchronization.
      if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
        logger.warn("Custom isolation level specified but no actual transaction initiated; " +
            "isolation level will effectively be ignored: " + def);
      }
      boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
      // 创建一个默认的TransactionStatus,里面的事务为空,同时更新当前线程的信息
      return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
  }
// 开启新事务
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
      boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
   boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  // 创建一个新事务, transaction里面的connectionHolder=null
  DefaultTransactionStatus status = newTransactionStatus(
         definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  // 开启新事务
   doBegin(transaction, definition);
  // 设置当前事务的事务同步管理器。这个事务同步管理器其实就是把针对当前事务的配置参数和当前线程绑在一起,通过TransactionSynchronizationManager管理
   prepareSynchronization(status, definition);
   return status;
}
@Override
  protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;
    try {
      // 因为是新的事务,所以需要创建新的链接Connection
      if (!txObject.hasConnectionHolder() ||
          txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
        // 获取新的Connection
        Connection newCon = obtainDataSource().getConnection();
        if (logger.isDebugEnabled()) {
          logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
        }
        // 把新的Connection设置进transaction中
        txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
      }
      txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
      con = txObject.getConnectionHolder().getConnection();
      // 设置readOnly和isolationLevel
      Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
      txObject.setPreviousIsolationLevel(previousIsolationLevel);
      txObject.setReadOnly(definition.isReadOnly());
      // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
      // so we don't want to do it unnecessarily (for example if we've explicitly
      // configured the connection pool to set it already).
      // 如果是自动提交,切换到手动提交
      if (con.getAutoCommit()) {
        txObject.setMustRestoreAutoCommit(true);
        if (logger.isDebugEnabled()) {
          logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
        }
        con.setAutoCommit(false);
      }
      // 如果是readOnly,通过Statement执行SET TRANSACTION READ ONLY
      prepareTransactionalConnection(con, definition);
      txObject.getConnectionHolder().setTransactionActive(true);
      int timeout = determineTimeout(definition);
      if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
        txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
      }
      // Bind the connection holder to the thread.
      if (txObject.isNewConnectionHolder()) {
        // 把新创建出来的ConnectionHolder绑定到当前线程中
        TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
      }
    }
    catch (Throwable ex) {
      // 出异常了就释放链接
      if (txObject.isNewConnectionHolder()) {
        DataSourceUtils.releaseConnection(con, obtainDataSource());
        txObject.setConnectionHolder(null, false);
      }
      throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
  }


相关文章
|
1月前
|
XML Java 开发者
Spring Boot开箱即用可插拔实现过程演练与原理剖析
【11月更文挑战第20天】Spring Boot是一个基于Spring框架的项目,其设计目的是简化Spring应用的初始搭建以及开发过程。Spring Boot通过提供约定优于配置的理念,减少了大量的XML配置和手动设置,使得开发者能够更专注于业务逻辑的实现。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,为开发者提供一个全面的理解。
36 0
|
10天前
|
NoSQL Java Redis
Spring Boot 自动配置机制:从原理到自定义
Spring Boot 的自动配置机制通过 `spring.factories` 文件和 `@EnableAutoConfiguration` 注解,根据类路径中的依赖和条件注解自动配置所需的 Bean,大大简化了开发过程。本文深入探讨了自动配置的原理、条件化配置、自定义自动配置以及实际应用案例,帮助开发者更好地理解和利用这一强大特性。
54 14
|
24天前
|
缓存 安全 Java
Spring高手之路26——全方位掌握事务监听器
本文深入探讨了Spring事务监听器的设计与实现,包括通过TransactionSynchronization接口和@TransactionalEventListener注解实现事务监听器的方法,并通过实例详细展示了如何在事务生命周期的不同阶段执行自定义逻辑,提供了实际应用场景中的最佳实践。
42 2
Spring高手之路26——全方位掌握事务监听器
|
25天前
|
Java 关系型数据库 数据库
京东面试:聊聊Spring事务?Spring事务的10种失效场景?加入型传播和嵌套型传播有什么区别?
45岁老架构师尼恩分享了Spring事务的核心知识点,包括事务的两种管理方式(编程式和声明式)、@Transactional注解的五大属性(transactionManager、propagation、isolation、timeout、readOnly、rollbackFor)、事务的七种传播行为、事务隔离级别及其与数据库隔离级别的关系,以及Spring事务的10种失效场景。尼恩还强调了面试中如何给出高质量答案,推荐阅读《尼恩Java面试宝典PDF》以提升面试表现。更多技术资料可在公众号【技术自由圈】获取。
|
1月前
|
Java 开发者 Spring
Spring高手之路24——事务类型及传播行为实战指南
本篇文章深入探讨了Spring中的事务管理,特别是事务传播行为(如REQUIRES_NEW和NESTED)的应用与区别。通过详实的示例和优化的时序图,全面解析如何在实际项目中使用这些高级事务控制技巧,以提升开发者的Spring事务管理能力。
56 1
Spring高手之路24——事务类型及传播行为实战指南
|
1月前
|
Java 开发者 Spring
Spring AOP 底层原理技术分享
Spring AOP(面向切面编程)是Spring框架中一个强大的功能,它允许开发者在不修改业务逻辑代码的情况下,增加额外的功能,如日志记录、事务管理等。本文将深入探讨Spring AOP的底层原理,包括其核心概念、实现方式以及如何与Spring框架协同工作。
|
1月前
|
Java 开发者 Spring
Spring AOP深度解析:探秘动态代理与增强逻辑
Spring框架中的AOP(Aspect-Oriented Programming,面向切面编程)功能为开发者提供了一种强大的工具,用以将横切关注点(如日志、事务管理等)与业务逻辑分离。本文将深入探讨Spring AOP的底层原理,包括动态代理机制和增强逻辑的实现。
44 4
|
1月前
|
JavaScript Java 关系型数据库
Spring事务失效的8种场景
本文总结了使用 @Transactional 注解时事务可能失效的几种情况,包括数据库引擎不支持事务、类未被 Spring 管理、方法非 public、自身调用、未配置事务管理器、设置为不支持事务、异常未抛出及异常类型不匹配等。针对这些情况,文章提供了相应的解决建议,帮助开发者排查和解决事务不生效的问题。
|
1月前
|
XML Java 数据库连接
Spring中的事务是如何实现的
Spring中的事务管理机制通过一系列强大的功能和灵活的配置选项,为开发者提供了高效且可靠的事务处理手段。无论是通过注解还是AOP配置,Spring都能轻松实现复杂的事务管理需求。掌握这些工具和最佳实践,能
58 3
|
2月前
|
存储 缓存 Java
Spring高手之路23——AOP触发机制与代理逻辑的执行
本篇文章深入解析了Spring AOP代理的触发机制和执行流程,从源码角度详细讲解了Bean如何被AOP代理,包括代理对象的创建、配置与执行逻辑,帮助读者全面掌握Spring AOP的核心技术。
54 3
Spring高手之路23——AOP触发机制与代理逻辑的执行