Spring事务如何集成到Mybatis之springboot事务

简介: Spring事务如何集成到Mybatis之springboot事务

SpringBoot中通过DataSourceTransactionManagerAutoConfiguration自动配置类,生成基于数据库的事务管理类:DataSourceTransactionManager,该类提供了事务的相关操作如:commit,rollback等。同时配置类通过注解:@EnableTransactionManagement启动声明式事务管理,本质上是解析@Transactional注解,并生成一个Advisor,用于事务的切面配置。


ProxyTransactionManagementConfiguration



@Configuration
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
  @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
    //事务切面
    BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
    advisor.setTransactionAttributeSource(transactionAttributeSource());
    advisor.setAdvice(transactionInterceptor());
    advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
    return advisor;
  }
  @Bean
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public TransactionAttributeSource transactionAttributeSource() {
    return new AnnotationTransactionAttributeSource();
  }
  @Bean
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public TransactionInterceptor transactionInterceptor() {
    TransactionInterceptor interceptor = new TransactionInterceptor();
    interceptor.setTransactionAttributeSource(transactionAttributeSource());
    if (this.txManager != null) {
      interceptor.setTransactionManager(this.txManager);
    }
    return interceptor;
  }
}


Spring的动态代理自动配置类AspectJAwareAdvisorAutoProxyCreator,该类本质上是一个BeanPostProcessor



public abstract class AbstractAutoProxyCreator extends ProxyProcessorSupport
    implements SmartInstantiationAwareBeanPostProcessor, BeanFactoryAware {
  /**
   * Create a proxy with the configured interceptors if the bean is
   * identified as one to proxy by the subclass.
   * @see #getAdvicesAndAdvisorsForBean
   */
  @Override
  public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    if (bean != null) {
      Object cacheKey = getCacheKey(bean.getClass(), beanName);
      if (!this.earlyProxyReferences.contains(cacheKey)) {
        return wrapIfNecessary(bean, beanName, cacheKey);
      }
    }
    return bean;
  }
    /**
   * Wrap the given bean if necessary, i.e. if it is eligible for being proxied.
   * @param bean the raw bean instance
   * @param beanName the name of the bean
   * @param cacheKey the cache key for metadata access
   * @return a proxy wrapping the bean, or the raw bean instance as-is
   */
  protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    if (beanName != null && this.targetSourcedBeans.contains(beanName)) {
      return bean;
    }
    if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {
      return bean;
    }
    if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {
      this.advisedBeans.put(cacheKey, Boolean.FALSE);
      return bean;
    }
    // 在此处判断事务切面是否命中,命中则创建代理
    Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
    if (specificInterceptors != DO_NOT_PROXY) {
      this.advisedBeans.put(cacheKey, Boolean.TRUE);
      //创建事务代理类
      Object proxy = createProxy(
          bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
      this.proxyTypes.put(cacheKey, proxy.getClass());
      return proxy;
    }
    this.advisedBeans.put(cacheKey, Boolean.FALSE);
    return bean;
  }
  //省略其他代码...
  }


事务代理类的具体代理逻辑



public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
  @Override
  public Object invoke(final MethodInvocation invocation) throws Throwable {
    // Work out the target class: may be {@code null}.
    // The TransactionAttributeSource should be passed the target class
    // as well as the method, which may be from an interface.
    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    // Adapt to TransactionAspectSupport's invokeWithinTransaction...
    return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
      @Override
      public Object proceedWithInvocation() throws Throwable {
        return invocation.proceed();
      }
    });
  }
  //省略其他代码...
}
//具体的实现逻辑在其父类:TransactionAspectSupport
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
      throws Throwable {
    // If the transaction attribute is null, the method is non-transactional.
    final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
      // 具体的事务创建逻辑
      TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
      Object retVal = null;
      try {
        // This is an around advice: Invoke the next interceptor in the chain.
        // This will normally result in a target object being invoked.
        retVal = invocation.proceedWithInvocation();
      }
      catch (Throwable ex) {
        //事务异常回滚
        completeTransactionAfterThrowing(txInfo, ex);
        throw ex;
      }
      finally {
        cleanupTransactionInfo(txInfo);
      }
      //事务提交
      commitTransactionAfterReturning(txInfo);
      return retVal;
    }else {
      final ThrowableHolder throwableHolder = new ThrowableHolder();
      // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
      try {
        Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
            new TransactionCallback<Object>() {
              @Override
              public Object doInTransaction(TransactionStatus status) {
                TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
                try {
                  return invocation.proceedWithInvocation();
                }
                catch (Throwable ex) {
                  if (txAttr.rollbackOn(ex)) {
                    // A RuntimeException: will lead to a rollback.
                    if (ex instanceof RuntimeException) {
                      throw (RuntimeException) ex;
                    }
                    else {
                      throw new ThrowableHolderException(ex);
                    }
                  }
                  else {
                    // A normal return value: will lead to a commit.
                    throwableHolder.throwable = ex;
                    return null;
                  }
                }
                finally {
                  cleanupTransactionInfo(txInfo);
                }
              }
            });
        // Check result state: It might indicate a Throwable to rethrow.
        if (throwableHolder.throwable != null) {
          throw throwableHolder.throwable;
        }
        return result;
      }
      catch (ThrowableHolderException ex) {
        throw ex.getCause();
      }
    //....
    }
  }


事务的具体创建逻辑:



protected TransactionInfo createTransactionIfNecessary(
      PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
    // If no name specified, apply method identification as transaction name.
    if (txAttr != null && txAttr.getName() == null) {
      txAttr = new DelegatingTransactionAttribute(txAttr) {
        @Override
        public String getName() {
          return joinpointIdentification;
        }
      };
    }
    TransactionStatus status = null;
    if (txAttr != null) {
      if (tm != null) {
        //平台事务管理器,具体的实现是:DataSourceTransactionManager
        status = tm.getTransaction(txAttr);
      }
      else {
        if (logger.isDebugEnabled()) {
          logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
              "] because no transaction manager has been configured");
        }
      }
    }
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
//DataSourceTransactionManager中的具体实现逻辑
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
    implements ResourceTransactionManager, InitializingBean {
  //父类的方法,此处为了说明问题,放在子类中来讲
  @Override
  public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
    Object transaction = doGetTransaction();
    // Cache debug flag to avoid repeated checks.
    boolean debugEnabled = logger.isDebugEnabled();
    //...
    // No existing transaction found -> check propagation behavior to find out how to proceed.
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
      throw new IllegalTransactionStateException(
          "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
        definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
        definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
      SuspendedResourcesHolder suspendedResources = suspend(null);
      if (debugEnabled) {
        logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
      }
      try {
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        DefaultTransactionStatus status = newTransactionStatus(
            definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
        //启用事务的核心逻辑
        doBegin(transaction, definition);
        //启用事务同步的核心逻辑,此处实现了Spring事务和Mybatis的关联
        prepareSynchronization(status, definition);
        return status;
      }
      catch (RuntimeException ex) {
        resume(null, suspendedResources);
        throw ex;
      }
      catch (Error err) {
        resume(null, suspendedResources);
        throw err;
      }
    }
      //...
  }
  @Override
  protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;
    try {
      if (!txObject.hasConnectionHolder() ||
          txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
        Connection newCon = this.dataSource.getConnection();
        if (logger.isDebugEnabled()) {
          logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
        }
        txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
      }
      txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
      con = txObject.getConnectionHolder().getConnection();
      Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
      txObject.setPreviousIsolationLevel(previousIsolationLevel);
      // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
      // so we don't want to do it unnecessarily (for example if we've explicitly
      // configured the connection pool to set it already).
      if (con.getAutoCommit()) {
        txObject.setMustRestoreAutoCommit(true);
        if (logger.isDebugEnabled()) {
          logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
        }
        //启用事务,关闭自动提交
        con.setAutoCommit(false);
      }
      prepareTransactionalConnection(con, definition);
      txObject.getConnectionHolder().setTransactionActive(true);
      int timeout = determineTimeout(definition);
      if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
        txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
      }
      // 以DataSource为Key,通过ThreadLocal将ConnectionHolder绑定到当前线程
      if (txObject.isNewConnectionHolder()) {
        TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
      }
    }
    catch (Throwable ex) {
      if (txObject.isNewConnectionHolder()) {
        DataSourceUtils.releaseConnection(con, this.dataSource);
        txObject.setConnectionHolder(null, false);
      }
      throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
  }  
    //启用事务同步  
    protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
    if (status.isNewSynchronization()) {
      TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
      TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
          definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
              definition.getIsolationLevel() : null);
      TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
      TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
      //启用事务同步,在Mybatis中会依据事务同步判断是否启用事务
      TransactionSynchronizationManager.initSynchronization();
    }
  }  
}
//Spring通过TransactionSynchronizationManager来将事务同步信息绑定到线程
public static void initSynchronization() throws IllegalStateException {
    if (isSynchronizationActive()) {
      throw new IllegalStateException("Cannot activate transaction synchronization - already active");
    }
    logger.trace("Initializing transaction synchronization");
    //本质上是一个ThreadLocal
    synchronizations.set(new LinkedHashSet<TransactionSynchronization>());
  }
//mybatis
private static void registerSessionHolder(SqlSessionFactory sessionFactory, ExecutorType executorType,
      PersistenceExceptionTranslator exceptionTranslator, SqlSession session) {
    SqlSessionHolder holder;
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
        //...
        holder = new SqlSessionHolder(session, executorType, exceptionTranslator);
        //同一个线程中mybatis和spring事务关联
        TransactionSynchronizationManager.bindResource(sessionFactory, holder);
        TransactionSynchronizationManager.registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));
        holder.setSynchronizedWithTransaction(true);
        holder.requested();
      } 
    //...
}


总结



Spring中启用事务后,会将数据库的连接对象通过ThreadLocal绑定到当前线程,以便后续同一个线程拿到的是同一个连接对象;同时会通过TransactionSynchronizationManager启用事务同步,便于后续Mybatis判断是否启用事务。

目录
打赏
0
0
0
0
4
分享
相关文章
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
131 43
SpringBoot集成Tomcat、DispatcherServlet
通过这些配置,您可以充分利用 Spring Boot 内置的功能,快速构建和优化您的 Web 应用。
52 21
SpringBoot是如何简化Spring开发的,以及SpringBoot的特性以及源码分析
Spring Boot 通过简化配置、自动配置和嵌入式服务器等特性,大大简化了 Spring 应用的开发过程。它通过提供一系列 `starter` 依赖和开箱即用的默认配置,使开发者能够更专注于业务逻辑而非繁琐的配置。Spring Boot 的自动配置机制和强大的 Actuator 功能进一步提升了开发效率和应用的可维护性。通过对其源码的分析,可以更深入地理解其内部工作机制,从而更好地利用其特性进行开发。
47 6
|
1月前
|
使用Spring Boot集成Nacos
通过上述步骤,Spring Boot应用可以成功集成Nacos,利用Nacos的服务发现和配置管理功能来提升微服务架构的灵活性和可维护性。通过这种集成,开发者可以更高效地管理和部署微服务。
222 17
SpringBoot集成Shiro权限+Jwt认证
本文主要描述如何快速基于SpringBoot 2.5.X版本集成Shiro+JWT框架,让大家快速实现无状态登陆和接口权限认证主体框架,具体业务细节未实现,大家按照实际项目补充。
92 11
Spring Boot 3 集成 Spring Security + JWT
本文详细介绍了如何使用Spring Boot 3和Spring Security集成JWT,实现前后端分离的安全认证概述了从入门到引入数据库,再到使用JWT的完整流程。列举了项目中用到的关键依赖,如MyBatis-Plus、Hutool等。简要提及了系统配置表、部门表、字典表等表结构。使用Hutool-jwt工具类进行JWT校验。配置忽略路径、禁用CSRF、添加JWT校验过滤器等。实现登录接口,返回token等信息。
417 12
Spring Boot 3 集成Spring AOP实现系统日志记录
本文介绍了如何在Spring Boot 3中集成Spring AOP实现系统日志记录功能。通过定义`SysLog`注解和配置相应的AOP切面,可以在方法执行前后自动记录日志信息,包括操作的开始时间、结束时间、请求参数、返回结果、异常信息等,并将这些信息保存到数据库中。此外,还使用了`ThreadLocal`变量来存储每个线程独立的日志数据,确保线程安全。文中还展示了项目实战中的部分代码片段,以及基于Spring Boot 3 + Vue 3构建的快速开发框架的简介与内置功能列表。此框架结合了当前主流技术栈,提供了用户管理、权限控制、接口文档自动生成等多项实用特性。
87 8
SpringBoot项目打包成war包
通过上述步骤,我们成功地将一个Spring Boot应用打包成WAR文件,并部署到外部的Tomcat服务器中。这种方式适用于需要与传统Servlet容器集成的场景。
23 8
Spring Boot 两种部署到服务器的方式
本文介绍了Spring Boot项目的两种部署方式:jar包和war包。Jar包方式使用内置Tomcat,只需配置JDK 1.8及以上环境,通过`nohup java -jar`命令后台运行,并开放服务器端口即可访问。War包则需将项目打包后放入外部Tomcat的webapps目录,修改启动类继承`SpringBootServletInitializer`并调整pom.xml中的打包类型为war,最后启动Tomcat访问应用。两者各有优劣,jar包更简单便捷,而war包适合传统部署场景。需要注意的是,war包部署时,内置Tomcat的端口配置不会生效。
267 17
Spring Boot 两种部署到服务器的方式
springboot自动配置原理
Spring Boot 自动配置原理:通过 `@EnableAutoConfiguration` 开启自动配置,扫描 `META-INF/spring.factories` 下的配置类,省去手动编写配置文件。使用 `@ConditionalXXX` 注解判断配置类是否生效,导入对应的 starter 后自动配置生效。通过 `@EnableConfigurationProperties` 加载配置属性,默认值与配置文件中的值结合使用。总结来说,Spring Boot 通过这些机制简化了开发配置流程,提升了开发效率。
66 17
springboot自动配置原理