Spring事务原理二(事务拦截逻辑)(上)

简介: Spring事务原理二(事务拦截逻辑)

事务的执行

当代理对象的方法被调用时,最终会调用到TransactionInterceptor的invoke()方法上面。对于为什么会调用到invoke()方法的小伙伴,需要取了解一下动态代理的原理,可阅读博客:java代理模式的这些细节,你知道多少?

@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
// 意思是最终会适配到TransactionAspectSupport.invokeWithinTransaction方法
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
复制代码

TransactionAspectSupport.invokeWithinTransaction()

@Nullable
  protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
      final InvocationCallback invocation) throws Throwable {
    // 获取AnnotationTransactionAttributeSource,这个是从外面设置进TransactionInterceptor的。
    // If the transaction attribute is null, the method is non-transactional.
    TransactionAttributeSource tas = getTransactionAttributeSource();
    // 获取Transactional注解参数
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    // 从容器中获取TransactionManager实例
    final TransactionManager tm = determineTransactionManager(txAttr);
    if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
      ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
        if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
          throw new TransactionUsageException(
              "Unsupported annotated transaction on suspending function detected: " + method +
              ". Use TransactionalOperator.transactional extensions instead.");
        }
        ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
        if (adapter == null) {
          throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
              method.getReturnType());
        }
        return new ReactiveTransactionSupport(adapter);
      });
      return txSupport.invokeWithinTransaction(
          method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
    }
    // TransactionManager转换成PlatformTransactionManager类型
    PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
    // 获取方法身份,类似com.github.AccountServiceImpl.save。
    // 因为txAttr是RuleBasedTransactionAttribute类型,所以最终返回ClassUtils.getQualifiedMethodName()返回【类名+"."+方法名】
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    // 如果是声明式事务
    if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
      // Standard transaction demarcation with getTransaction and commit/rollback calls.
      // 获取事务,这里面就要考虑spring的事务传播机制
      // *****这是重点*****这是重点*****这是重点*****这是重点*****这是重点*****这是重点*****
      TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
      Object retVal;
      try {
        // 执行目标方法
        // This is an around advice: Invoke the next interceptor in the chain.
        // This will normally result in a target object being invoked.
        retVal = invocation.proceedWithInvocation();
      }
      catch (Throwable ex) {
        // target invocation exception
        // 遇到异常的情况如何处理事务,要结合事务的传播机制来看。
        completeTransactionAfterThrowing(txInfo, ex);
        throw ex;
      }
      finally {
        // 清除当前事务的相关信息。把当前线程中TransactionInfo设置成oldTransactionInfo
        cleanupTransactionInfo(txInfo);
      }
      // 假如返回值是Try<Integer> result = Try.of(() -> 1 / 0)这样的类型,那么就要检测它是否会抛异常
      if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
        // Set rollback-only in case of Vavr failure matching our rollback rules...
        TransactionStatus status = txInfo.getTransactionStatus();
        if (status != null && txAttr != null) {
          // 真正地执行后获取结果。异常后设置status里面的属性值rollbackOnly=true
          retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
        }
      }
      // 最后就是准备提交事务。
      commitTransactionAfterReturning(txInfo);
      return retVal;
    }
    // 下面是编程式事务
    else {
      Object result;
      final ThrowableHolder throwableHolder = new ThrowableHolder();
      // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
      try {
        result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
          TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
          try {
            Object retVal = invocation.proceedWithInvocation();
            if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
              // Set rollback-only in case of Vavr failure matching our rollback rules...
              retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
            }
            return retVal;
          }
          catch (Throwable ex) {
            if (txAttr.rollbackOn(ex)) {
              // A RuntimeException: will lead to a rollback.
              if (ex instanceof RuntimeException) {
                throw (RuntimeException) ex;
              }
              else {
                throw new ThrowableHolderException(ex);
              }
            }
            else {
              // A normal return value: will lead to a commit.
              throwableHolder.throwable = ex;
              return null;
            }
          }
          finally {
            cleanupTransactionInfo(txInfo);
          }
        });
      }
      catch (ThrowableHolderException ex) {
        throw ex.getCause();
      }
      catch (TransactionSystemException ex2) {
        if (throwableHolder.throwable != null) {
          logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
          ex2.initApplicationException(throwableHolder.throwable);
        }
        throw ex2;
      }
      catch (Throwable ex2) {
        if (throwableHolder.throwable != null) {
          logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
        }
        throw ex2;
      }
      // Check result state: It might indicate a Throwable to rethrow.
      if (throwableHolder.throwable != null) {
        throw throwableHolder.throwable;
      }
      return result;
    }
  }
复制代码

总结下来,重点就下面几个方法

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
      final InvocationCallback invocation) throws Throwable {
  // 调用目标方法前,根据情况确定要不要创建新的事务
  TransactionInfo txInfo = createTransactionIfNecessary();
  Object retVal;
  try{
    // 调用目标方法
    retVal = invocation.proceedWithInvocation();
  }catch(Throwable ex){
    // 根据调用目标方法的实际情况确定是回滚还是提交,还是只设置回滚标记。里面最后一步还包括回收链接,恢复被挂起的事务等等
    completeTransactionAfterThrowing(txInfo, ex);
    throw ex;
  }finally{
    // 清除线程中当前事务信息
    cleanupTransactionInfo(txInfo);
  }
  // 根据情况判断是提交还是回滚。里面最后一步还包括回收链接,恢复被挂起的事务等等
  commitTransactionAfterReturning(txInfo);
  return retVal;
}
复制代码

createTransactionIfNecessary

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
      @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
  // 下面就是包装了一下
    // If no name specified, apply method identification as transaction name.
    if (txAttr != null && txAttr.getName() == null) {
      txAttr = new DelegatingTransactionAttribute(txAttr) {
        @Override
        public String getName() {
          return joinpointIdentification;
        }
      };
    }
  // *****这是重点,获取事务状态******
    TransactionStatus status = null;
    if (txAttr != null) {
      if (tm != null) {
        status = tm.getTransaction(txAttr);
      }
      else {
        if (logger.isDebugEnabled()) {
          logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
              "] because no transaction manager has been configured");
        }
      }
    }
  // 这里就是再包装一层返回TransactionInfo对象
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
  }
@Override
  public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
      throws TransactionException {
    // Use defaults if no transaction definition given.
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
    // 如果第一次调用的话,返回DataSourceTransactionObject实例,connectionHolder=null;
    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();
    // 如果有connectionHolder,并且事务是激活状态。说明这不是第一次进代理方法,那么就需要结合当前的传播机制来处理当前的事务
    if (isExistingTransaction(transaction)) {
      // Existing transaction found -> check propagation behavior to find out how to behave.
      // *******************这是重点,需要结合事务传播机制来看*******************
      return handleExistingTransaction(def, transaction, debugEnabled);
    }
    // 检测超时时间是否合法
    // Check definition settings for new transaction.
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
      throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }
    // 下面都是第一次调用。说明之前不存在任何事务。
    // No existing transaction found -> check propagation behavior to find out how to proceed.
    // 当传播机制是PROPAGATION_MANDATORY,表示当前必须存在一个事务,否则抛出异常
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
      throw new IllegalTransactionStateException(
          "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    // 当第一次调用时,如果传播机制是以下三种的话,那么都需要新建事务
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
             //PROPAGATION_NESTED是嵌套事务,这个事务能否成功提交取决于外部事务是否成功
             // 如果内部事务成功,外部事务失败,那么一起回滚
             // 如果内部事务成功,外部事务成功,那么一起提交
             // 如果内部事务失败,外部事务失败,那么一起回滚
             // 如果内部事务失败,外部事务成功,那么内部事务回滚,外部事务提交
        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
      // 挂起上一个事务,目前不存在上一个事务,那么直接传null
      SuspendedResourcesHolder suspendedResources = suspend(null);
      if (debugEnabled) {
        logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
      }
      try {
        // 开启新事务
        return startTransaction(def, transaction, debugEnabled, suspendedResources);
      }
      catch (RuntimeException | Error ex) {
        resume(null, suspendedResources);
        throw ex;
      }
    }
    // 剩下的传播机制,都创建空事务
    else {
      // Create "empty" transaction: no actual transaction, but potentially synchronization.
      if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
        logger.warn("Custom isolation level specified but no actual transaction initiated; " +
            "isolation level will effectively be ignored: " + def);
      }
      boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
      // 创建一个默认的TransactionStatus,里面的事务为空,同时更新当前线程的信息
      return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
  }
// 开启新事务
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
      boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
   boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  // 创建一个新事务, transaction里面的connectionHolder=null
  DefaultTransactionStatus status = newTransactionStatus(
         definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  // 开启新事务
   doBegin(transaction, definition);
  // 设置当前事务的事务同步管理器。这个事务同步管理器其实就是把针对当前事务的配置参数和当前线程绑在一起,通过TransactionSynchronizationManager管理
   prepareSynchronization(status, definition);
   return status;
}
@Override
  protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;
    try {
      // 因为是新的事务,所以需要创建新的链接Connection
      if (!txObject.hasConnectionHolder() ||
          txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
        // 获取新的Connection
        Connection newCon = obtainDataSource().getConnection();
        if (logger.isDebugEnabled()) {
          logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
        }
        // 把新的Connection设置进transaction中
        txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
      }
      txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
      con = txObject.getConnectionHolder().getConnection();
      // 设置readOnly和isolationLevel
      Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
      txObject.setPreviousIsolationLevel(previousIsolationLevel);
      txObject.setReadOnly(definition.isReadOnly());
      // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
      // so we don't want to do it unnecessarily (for example if we've explicitly
      // configured the connection pool to set it already).
      // 如果是自动提交,切换到手动提交
      if (con.getAutoCommit()) {
        txObject.setMustRestoreAutoCommit(true);
        if (logger.isDebugEnabled()) {
          logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
        }
        con.setAutoCommit(false);
      }
      // 如果是readOnly,通过Statement执行SET TRANSACTION READ ONLY
      prepareTransactionalConnection(con, definition);
      txObject.getConnectionHolder().setTransactionActive(true);
      int timeout = determineTimeout(definition);
      if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
        txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
      }
      // Bind the connection holder to the thread.
      if (txObject.isNewConnectionHolder()) {
        // 把新创建出来的ConnectionHolder绑定到当前线程中
        TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
      }
    }
    catch (Throwable ex) {
      // 出异常了就释放链接
      if (txObject.isNewConnectionHolder()) {
        DataSourceUtils.releaseConnection(con, obtainDataSource());
        txObject.setConnectionHolder(null, false);
      }
      throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
  }


相关文章
|
4天前
|
Java 数据库 开发者
|
4天前
|
Java 关系型数据库 MySQL
【JavaEE】Spring事务-@Transactional参数介绍-事务的隔离级别以及传播机制
【JavaEE】Spring事务-@Transactional参数介绍-事务的隔离级别以及传播机制
9 0
|
4天前
|
消息中间件 Java 关系型数据库
【JavaEE】Spring事务-事务的基本介绍-事务的实现-@Transactional基本介绍和使用
【JavaEE】Spring事务-事务的基本介绍-事务的实现-@Transactional基本介绍和使用
8 0
|
4天前
|
监控 安全 Java
Spring cloud原理详解
Spring cloud原理详解
18 0
|
4天前
|
SQL Java 关系型数据库
Spring 事务
Spring 事务
12 1
|
4天前
|
Java 数据库连接 数据库
Spring事务简介,事务角色,事务属性
Spring事务简介,事务角色,事务属性
18 2
|
4天前
|
Java 开发者 微服务
Spring Cloud原理详解
【5月更文挑战第4天】Spring Cloud是Spring生态系统中的微服务框架,包含配置管理、服务发现、断路器、API网关等工具,简化分布式系统开发。核心组件如Eureka(服务发现)、Config Server(配置中心)、Ribbon(负载均衡)、Hystrix(断路器)、Zuul(API网关)等。本文讨论了Spring Cloud的基本概念、核心组件、常见问题及解决策略,并提供代码示例,帮助开发者更好地理解和实践微服务架构。此外,还涵盖了服务通信方式、安全性、性能优化、自动化部署、服务网格和无服务器架构的融合等话题,揭示了微服务架构的未来趋势。
38 6
|
4天前
|
Java 数据库连接 数据库
16:事务-Java Spring
16:事务-Java Spring
29 5
|
4天前
|
消息中间件 Java 关系型数据库
Spring事务与分布式事务
这篇文档介绍了事务的概念和数据库事务的ACID特性:原子性、一致性、隔离性和持久性。在并发环境下,事务可能出现更新丢失、脏读和不可重复读等问题,这些问题通过设置事务隔离级别(如读未提交、读已提交、可重复读和序列化)来解决。Spring事务传播行为有七种模式,影响嵌套事务的执行方式。`@Transactional`注解用于管理事务,其属性包括传播行为、隔离级别、超时和只读等。最后提到了分布式事务,分为跨库和跨服务两种情况,跨服务的分布式事务通常通过最终一致性策略,如消息队列实现。
|
4天前
|
负载均衡 Java 开发者
Spring Cloud:一文读懂其原理与架构
Spring Cloud 是一套微服务解决方案,它整合了Netflix公司的多个开源框架,简化了分布式系统开发。Spring Cloud 提供了服务注册与发现、配置中心、消息总线、负载均衡、熔断机制等工具,让开发者可以快速地构建一些常见的微服务架构。