Spring事务原理二(事务拦截逻辑)(下)

简介: Spring事务原理二(事务拦截逻辑)

前面已经创建过事务的情况:

private TransactionStatus handleExistingTransaction(
      TransactionDefinition definition, Object transaction, boolean debugEnabled)
      throws TransactionException {
  // 那么遇到不允许有事务的传播机制,就直接抛异常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
      throw new IllegalTransactionStateException(
          "Existing transaction found for transaction marked with propagation 'never'");
    }
  // PROPAGATION_NOT_SUPPORTED就是以非事务的方式执行,那么我们就需要先把当前的事务挂起来
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
      if (debugEnabled) {
        logger.debug("Suspending current transaction");
      }
      // 挂起当前事务
      // 1.把当前线程的connectionHolder解绑并返回connectionHolder
      // 2.取出当前事务同步器中的所有信息,并重置线程同步器信息
      // 3.把connectionHolder和相关信息封装成一个SuspendedResourcesHolder对象返回
      Object suspendedResources = suspend(transaction);
      boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
      // 创建一个DefaultTransactionStatus返回
      return prepareTransactionStatus(
          definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }
  // 如果是PROPAGATION_REQUIRES_NEW,那么必须要创建一个新的事务
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
      if (debugEnabled) {
        logger.debug("Suspending current transaction, creating new transaction with name [" +
            definition.getName() + "]");
      }
      // 挂起当前事务
      SuspendedResourcesHolder suspendedResources = suspend(transaction);
      try {
        // 开启新事务
        return startTransaction(definition, transaction, debugEnabled, suspendedResources);
      }
      catch (RuntimeException | Error beginEx) {
        resumeAfterBeginException(transaction, suspendedResources, beginEx);
        throw beginEx;
      }
    }
  // 如果是PROPAGATION_NESTED,那么就是嵌套事务。就要使用回滚点
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
      if (!isNestedTransactionAllowed()) {
        throw new NestedTransactionNotSupportedException(
            "Transaction manager does not allow nested transactions by default - " +
            "specify 'nestedTransactionAllowed' property with value 'true'");
      }
      if (debugEnabled) {
        logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
      }
      if (useSavepointForNestedTransaction()) {
        // Create savepoint within existing Spring-managed transaction,
        // through the SavepointManager API implemented by TransactionStatus.
        // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
        // 同样的还是创建一个DefaultTransactionStatus对象
        DefaultTransactionStatus status =
            prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
        // 创建回滚点后返回
        status.createAndHoldSavepoint();
        return status;
      }
      else {
        // Nested transaction through nested begin and commit/rollback calls.
        // Usually only for JTA: Spring synchronization might get activated here
        // in case of a pre-existing JTA transaction.
        // 针对JTA事务,创建一个新的事务来处理
        return startTransaction(definition, transaction, debugEnabled, null);
      }
    }
    // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    if (debugEnabled) {
      logger.debug("Participating in existing transaction");
    }
    if (isValidateExistingTransaction()) {
      if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
        Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
        if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
          Constants isoConstants = DefaultTransactionDefinition.constants;
          throw new IllegalTransactionStateException("Participating transaction with definition [" +
              definition + "] specifies isolation level which is incompatible with existing transaction: " +
              (currentIsolationLevel != null ?
                  isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                  "(unknown)"));
        }
      }
      if (!definition.isReadOnly()) {
        if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
          throw new IllegalTransactionStateException("Participating transaction with definition [" +
              definition + "] is not marked as read-only but existing transaction is");
        }
      }
    }
  // 剩下的传播机制,还是当前事务
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
  }
复制代码

总结一下:

1.createTransactionIfNecessary():当代理对象方法被调用时,获取TransactionAttribute、TransactionManager以及目标方法名joinpointIdentification,利用这三个参数来创建事务TransactionInfo

  • 1.1.tm.getTransaction():获取事务
  • 1.1.1:doGetTransaction():创建新的DataSourceTransactionObject,并从当前线程中查找connectionHolder,不管有没有找到,都设置进DataSourceTransactionObject对象中。
  • 1.1.2:isExistingTransaction():判断是否已经存在事务,其实就是判断DataSourceTransactionObject中是否有connectionHolder,并且事务是否是激活的。
  • 1.1.3:handleExistingTransaction():如果已经存在事务。-----------这是逻辑分支------------
  • 1.1.3.1:如果存在事务,当传播机制是PROPAGATION_NEVER,那么直接抛异常。
  • 1.1.3.2:如果存在事务,当传播机制是PROPAGATION_NOT_SUPPORTED,那么直接挂起当前事务。创建DefaultTransactionStatus对象,并且更新当前线程事务信息。
  • 1.1.3.3:如果存在事务,当传播机制是PROPAGATION_REQUIRES_NEW,那么先挂起当前事务,并且创建新的连接设置进新事务中,并通过新创建的transaction封装一个DefaultTransactionStatus对象,同时更新当前线程中事务信息。
  • 1.1.3.4:如果存在事务,当传播机制是PROPAGATION_NESTED,那么直接封装一个DefaultTransactionStatus对象,更新当前线程事务信息,并设置保存点后返回。
  • 1.1.3.5:如果存在事务,当传播机制是PROPAGATION_SUPPORTS、PROPAGATION_MANDATORY、PROPAGATION_REQUIRED,那么直接创建一个DefaultTransactionStatus对象,更新当前线程的事务信息后返回。
  • 1.1.4:如果不存在事务,传播机制是PROPAGATION_MANDATORY,就抛异常。该传播机制的要求是当前一定要存在事务才能继续执行,否则就抛异常。-----------这是逻辑分支------------
  • 1.1.5:如果不存在事务,传播机制是PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED,那么意味着就需要自己创建事务。-----------这是逻辑分支------------
  • 1.1.5.1:suspend(null):挂起空事务。就是把当前线程中的事务相关信息都取出来封装到一个SuspendedResourcesHolder对象中并返回。并且重置当前线程中事务相关信息,通过TransactionSynchronizationManager来设置。
  • 1.1.5.2:startTransaction():开启新事务
  • 1.1.5.2.1:doBegin():创建新的ConnectionHolder,并设置进transaction中,开启手动提交,最后绑定到当前线程中
  • 1.1.5.2.2:prepareSynchronization():创建DefaultTransactionStatus对象,更新当前线程中的信息。因为此时已经创建了新的事务了。
  • 1.1.6:如果不存在事务,传播机制是PROPAGATION_SUPPORTS、PROPAGATION_NOT_SUPPORTED、PROPAGATION_NEVER,那么就直接创建一个DefaultTransactionStatus对象返回。同时更新一下当前线程中的信息。-----------这是逻辑分支------------
  • 1.2.prepareTransactionInfo():把TransactionAttribute、TransactionManager以及目标方法名joinpointIdentification打包成TransactionInfo对象,并且把TransactionStatus设置进去,最后把oldTransactionInfo解绑并取出来保存,把最新的TransactionInfo和当前线程绑定
protected static final class TransactionInfo {
  private void bindToThread() {
      // 从当前线程中取出来,然后把新的TransactionInfo和当前线程绑定
      this.oldTransactionInfo = transactionInfoHolder.get();
      transactionInfoHolder.set(this);
    }
}
复制代码
  • 其实总结一下上面的逻辑,createTransactionIfNecessary()最终创建了一个TransactionInfo对象,但是整个过程重点在于通过getTransaction()方法获取TransactionStatus对象,创建这个对象重点取决于创建Transaction,然后这个对象里面其实是包含了最关键的connectionHolder。

completeTransactionAfterThrowing:捕获执行目标代码后抛出的异常,理论上应该是在这里面做rollback

protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
      if (logger.isTraceEnabled()) {
        logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
            "] after exception: " + ex);
      }
      if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
        try {
          // 如果这个抛出的异常正好匹配上Transaction注解上指定的异常,那么执行回滚操作
          txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
        }
        catch (TransactionSystemException ex2) {
          logger.error("Application exception overridden by rollback exception", ex);
          ex2.initApplicationException(ex);
          throw ex2;
        }
        catch (RuntimeException | Error ex2) {
          logger.error("Application exception overridden by rollback exception", ex);
          throw ex2;
        }
      }
      else {
        // We don't roll back on this exception.
        // Will still roll back if TransactionStatus.isRollbackOnly() is true.
        try {
          // 如果异常匹配不上,那么还是依然执行提交。如果TransactionStatus里面的状态被标记为需要回滚的话,那么还是依然回滚,否则就提交
          txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
        }
        catch (TransactionSystemException ex2) {
          logger.error("Application exception overridden by commit exception", ex);
          ex2.initApplicationException(ex);
          throw ex2;
        }
        catch (RuntimeException | Error ex2) {
          logger.error("Application exception overridden by commit exception", ex);
          throw ex2;
        }
      }
    }
  }
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
    try {
      boolean unexpectedRollback = unexpected;
      try {
        triggerBeforeCompletion(status);
        // 检查是否有保存点,如果有保存点,就重置到保存点
        if (status.hasSavepoint()) {
          if (status.isDebug()) {
            logger.debug("Rolling back transaction to savepoint");
          }
          // 重置到保存点
          status.rollbackToHeldSavepoint();
        }
        // 如果是新的事务,那么就直接回滚
        else if (status.isNewTransaction()) {
          if (status.isDebug()) {
            logger.debug("Initiating transaction rollback");
          }
          // 其实是拿出里面的ConnectionHolder来调用rollback
          doRollback(status);
        }
        else {
          // Participating in larger transaction
          // 如果不是新的事务,那么为了避免多次回滚,只需要将当前的rollback标记设置成true,这样的话,当上层代理catch这个抛出去的异常时,会检查rollback标记,最后才会回滚。
          if (status.hasTransaction()) {
            if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
              if (status.isDebug()) {
                logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
              }
              doSetRollbackOnly(status);
            }
            else {
              if (status.isDebug()) {
                logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
              }
            }
          }
          else {
            logger.debug("Should roll back transaction but cannot - no transaction available");
          }
          // Unexpected rollback only matters here if we're asked to fail early
          if (!isFailEarlyOnGlobalRollbackOnly()) {
            unexpectedRollback = false;
          }
        }
      }
      catch (RuntimeException | Error ex) {
        triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
        throw ex;
      }
      triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
      // Raise UnexpectedRollbackException if we had a global rollback-only marker
      if (unexpectedRollback) {
        throw new UnexpectedRollbackException(
            "Transaction rolled back because it has been marked as rollback-only");
      }
    }
    finally {
      // 解绑connectionHolder,重置connectionHolder,并且放回链接池。把挂起的事务恢复。
      cleanupAfterCompletion(status);
    }
  }
复制代码

cleanupTransactionInfo(txInfo);这个方法其实只是把线程里面的TransactionInfo对象替换成了oldTransactionInfo对象,并没有修改当前的TransactionInfo对象。

commitTransactionAfterReturning

protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
      if (logger.isTraceEnabled()) {
        logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
      }
      // 其实在这里面还是会判断TransactionStatus里面是否有回滚状态,如果有的话,依然会回滚,否则就提交
      txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    }
  }
复制代码
  • 如果捕捉到异常,那么直接进入到completeTransactionAfterThrowing()方法中。从completeTransactionAfterThrowing()方法出来,把异常抛出去之前,还要执行finally中的cleanupTransactionInfo()方法
  • completeTransactionAfterThrowing()方法:
  • 1.判断当前异常是否匹配注解设置的异常,是的话,进入
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
复制代码
  • 1.1:判断是否有保存点,关联到传播机制为PROPAGATION_NESTED,如果有保存点,那么直接回滚到保存点
  • 1.2:判断是否是新事务,关联到传播机制为PROPAGATION_REQUIRES_NEW,那么直接执行回滚操作
  • 1.3:判断不是新事务,那么直接设置rollback标记,避免多次回滚。因为异常会一直往外抛,这个时候只需要设置回滚标记即可。
  • 1.4:最后都会cleanupAfterCompletion():修改当前线程中的事务信息,恢复挂起的事务。解绑并重置connectionHolder,放回链接池。
  • 2.如果没有匹配到这个异常的话,那么进入
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
复制代码
  • 2.1:判断事务是否被标记为回滚,那么说明内层事务之前已经设置了回滚标记,此时当前事务是新事务的话,就直接回滚。修改当前线程中的事务信息,恢复挂起的事务。解绑并重置connectionHolder,放回链接池。
  • 2.2:如果当前事务被标记回滚,并且不是新事务的话,那么继续标记回滚,并且抛出异常。修改当前线程中的事务信息,恢复挂起的事务。解绑并重置connectionHolder,放回链接池。
  • 2.3:如果不满足以上条件,那么直接提交。如果有保存点,那么直接释放保存点。如果是新事务,直接提交。如果提交失败,那么回滚。最后进行清理工作。修改当前线程中的事务信息,恢复挂起的事务。解绑并重置connectionHolder,放回链接池。


相关文章
|
4天前
|
Java 数据库 开发者
|
4天前
|
Java 关系型数据库 MySQL
【JavaEE】Spring事务-@Transactional参数介绍-事务的隔离级别以及传播机制
【JavaEE】Spring事务-@Transactional参数介绍-事务的隔离级别以及传播机制
9 0
|
4天前
|
消息中间件 Java 关系型数据库
【JavaEE】Spring事务-事务的基本介绍-事务的实现-@Transactional基本介绍和使用
【JavaEE】Spring事务-事务的基本介绍-事务的实现-@Transactional基本介绍和使用
8 0
|
4天前
|
监控 安全 Java
Spring cloud原理详解
Spring cloud原理详解
18 0
|
4天前
|
SQL Java 关系型数据库
Spring 事务
Spring 事务
12 1
|
4天前
|
Java 数据库连接 数据库
Spring事务简介,事务角色,事务属性
Spring事务简介,事务角色,事务属性
18 2
|
4天前
|
Java 开发者 微服务
Spring Cloud原理详解
【5月更文挑战第4天】Spring Cloud是Spring生态系统中的微服务框架,包含配置管理、服务发现、断路器、API网关等工具,简化分布式系统开发。核心组件如Eureka(服务发现)、Config Server(配置中心)、Ribbon(负载均衡)、Hystrix(断路器)、Zuul(API网关)等。本文讨论了Spring Cloud的基本概念、核心组件、常见问题及解决策略,并提供代码示例,帮助开发者更好地理解和实践微服务架构。此外,还涵盖了服务通信方式、安全性、性能优化、自动化部署、服务网格和无服务器架构的融合等话题,揭示了微服务架构的未来趋势。
38 6
|
4天前
|
Java 数据库连接 数据库
16:事务-Java Spring
16:事务-Java Spring
29 5
|
4天前
|
消息中间件 Java 关系型数据库
Spring事务与分布式事务
这篇文档介绍了事务的概念和数据库事务的ACID特性:原子性、一致性、隔离性和持久性。在并发环境下,事务可能出现更新丢失、脏读和不可重复读等问题,这些问题通过设置事务隔离级别(如读未提交、读已提交、可重复读和序列化)来解决。Spring事务传播行为有七种模式,影响嵌套事务的执行方式。`@Transactional`注解用于管理事务,其属性包括传播行为、隔离级别、超时和只读等。最后提到了分布式事务,分为跨库和跨服务两种情况,跨服务的分布式事务通常通过最终一致性策略,如消息队列实现。
|
4天前
|
负载均衡 Java 开发者
Spring Cloud:一文读懂其原理与架构
Spring Cloud 是一套微服务解决方案,它整合了Netflix公司的多个开源框架,简化了分布式系统开发。Spring Cloud 提供了服务注册与发现、配置中心、消息总线、负载均衡、熔断机制等工具,让开发者可以快速地构建一些常见的微服务架构。