Spring事务如何集成到Mybatis之springboot事务

简介: Spring事务如何集成到Mybatis之springboot事务

SpringBoot中通过DataSourceTransactionManagerAutoConfiguration自动配置类,生成基于数据库的事务管理类:DataSourceTransactionManager,该类提供了事务的相关操作如:commit,rollback等。同时配置类通过注解:@EnableTransactionManagement启动声明式事务管理,本质上是解析@Transactional注解,并生成一个Advisor,用于事务的切面配置。


ProxyTransactionManagementConfiguration



@Configuration
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
  @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
    //事务切面
    BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
    advisor.setTransactionAttributeSource(transactionAttributeSource());
    advisor.setAdvice(transactionInterceptor());
    advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
    return advisor;
  }
  @Bean
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public TransactionAttributeSource transactionAttributeSource() {
    return new AnnotationTransactionAttributeSource();
  }
  @Bean
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public TransactionInterceptor transactionInterceptor() {
    TransactionInterceptor interceptor = new TransactionInterceptor();
    interceptor.setTransactionAttributeSource(transactionAttributeSource());
    if (this.txManager != null) {
      interceptor.setTransactionManager(this.txManager);
    }
    return interceptor;
  }
}


Spring的动态代理自动配置类AspectJAwareAdvisorAutoProxyCreator,该类本质上是一个BeanPostProcessor



public abstract class AbstractAutoProxyCreator extends ProxyProcessorSupport
    implements SmartInstantiationAwareBeanPostProcessor, BeanFactoryAware {
  /**
   * Create a proxy with the configured interceptors if the bean is
   * identified as one to proxy by the subclass.
   * @see #getAdvicesAndAdvisorsForBean
   */
  @Override
  public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    if (bean != null) {
      Object cacheKey = getCacheKey(bean.getClass(), beanName);
      if (!this.earlyProxyReferences.contains(cacheKey)) {
        return wrapIfNecessary(bean, beanName, cacheKey);
      }
    }
    return bean;
  }
    /**
   * Wrap the given bean if necessary, i.e. if it is eligible for being proxied.
   * @param bean the raw bean instance
   * @param beanName the name of the bean
   * @param cacheKey the cache key for metadata access
   * @return a proxy wrapping the bean, or the raw bean instance as-is
   */
  protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    if (beanName != null && this.targetSourcedBeans.contains(beanName)) {
      return bean;
    }
    if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {
      return bean;
    }
    if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {
      this.advisedBeans.put(cacheKey, Boolean.FALSE);
      return bean;
    }
    // 在此处判断事务切面是否命中,命中则创建代理
    Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
    if (specificInterceptors != DO_NOT_PROXY) {
      this.advisedBeans.put(cacheKey, Boolean.TRUE);
      //创建事务代理类
      Object proxy = createProxy(
          bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
      this.proxyTypes.put(cacheKey, proxy.getClass());
      return proxy;
    }
    this.advisedBeans.put(cacheKey, Boolean.FALSE);
    return bean;
  }
  //省略其他代码...
  }


事务代理类的具体代理逻辑



public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
  @Override
  public Object invoke(final MethodInvocation invocation) throws Throwable {
    // Work out the target class: may be {@code null}.
    // The TransactionAttributeSource should be passed the target class
    // as well as the method, which may be from an interface.
    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    // Adapt to TransactionAspectSupport's invokeWithinTransaction...
    return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
      @Override
      public Object proceedWithInvocation() throws Throwable {
        return invocation.proceed();
      }
    });
  }
  //省略其他代码...
}
//具体的实现逻辑在其父类:TransactionAspectSupport
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
      throws Throwable {
    // If the transaction attribute is null, the method is non-transactional.
    final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
      // 具体的事务创建逻辑
      TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
      Object retVal = null;
      try {
        // This is an around advice: Invoke the next interceptor in the chain.
        // This will normally result in a target object being invoked.
        retVal = invocation.proceedWithInvocation();
      }
      catch (Throwable ex) {
        //事务异常回滚
        completeTransactionAfterThrowing(txInfo, ex);
        throw ex;
      }
      finally {
        cleanupTransactionInfo(txInfo);
      }
      //事务提交
      commitTransactionAfterReturning(txInfo);
      return retVal;
    }else {
      final ThrowableHolder throwableHolder = new ThrowableHolder();
      // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
      try {
        Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
            new TransactionCallback<Object>() {
              @Override
              public Object doInTransaction(TransactionStatus status) {
                TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
                try {
                  return invocation.proceedWithInvocation();
                }
                catch (Throwable ex) {
                  if (txAttr.rollbackOn(ex)) {
                    // A RuntimeException: will lead to a rollback.
                    if (ex instanceof RuntimeException) {
                      throw (RuntimeException) ex;
                    }
                    else {
                      throw new ThrowableHolderException(ex);
                    }
                  }
                  else {
                    // A normal return value: will lead to a commit.
                    throwableHolder.throwable = ex;
                    return null;
                  }
                }
                finally {
                  cleanupTransactionInfo(txInfo);
                }
              }
            });
        // Check result state: It might indicate a Throwable to rethrow.
        if (throwableHolder.throwable != null) {
          throw throwableHolder.throwable;
        }
        return result;
      }
      catch (ThrowableHolderException ex) {
        throw ex.getCause();
      }
    //....
    }
  }


事务的具体创建逻辑:



protected TransactionInfo createTransactionIfNecessary(
      PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
    // If no name specified, apply method identification as transaction name.
    if (txAttr != null && txAttr.getName() == null) {
      txAttr = new DelegatingTransactionAttribute(txAttr) {
        @Override
        public String getName() {
          return joinpointIdentification;
        }
      };
    }
    TransactionStatus status = null;
    if (txAttr != null) {
      if (tm != null) {
        //平台事务管理器,具体的实现是:DataSourceTransactionManager
        status = tm.getTransaction(txAttr);
      }
      else {
        if (logger.isDebugEnabled()) {
          logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
              "] because no transaction manager has been configured");
        }
      }
    }
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
//DataSourceTransactionManager中的具体实现逻辑
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
    implements ResourceTransactionManager, InitializingBean {
  //父类的方法,此处为了说明问题,放在子类中来讲
  @Override
  public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
    Object transaction = doGetTransaction();
    // Cache debug flag to avoid repeated checks.
    boolean debugEnabled = logger.isDebugEnabled();
    //...
    // No existing transaction found -> check propagation behavior to find out how to proceed.
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
      throw new IllegalTransactionStateException(
          "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
        definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
        definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
      SuspendedResourcesHolder suspendedResources = suspend(null);
      if (debugEnabled) {
        logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
      }
      try {
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        DefaultTransactionStatus status = newTransactionStatus(
            definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
        //启用事务的核心逻辑
        doBegin(transaction, definition);
        //启用事务同步的核心逻辑,此处实现了Spring事务和Mybatis的关联
        prepareSynchronization(status, definition);
        return status;
      }
      catch (RuntimeException ex) {
        resume(null, suspendedResources);
        throw ex;
      }
      catch (Error err) {
        resume(null, suspendedResources);
        throw err;
      }
    }
      //...
  }
  @Override
  protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;
    try {
      if (!txObject.hasConnectionHolder() ||
          txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
        Connection newCon = this.dataSource.getConnection();
        if (logger.isDebugEnabled()) {
          logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
        }
        txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
      }
      txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
      con = txObject.getConnectionHolder().getConnection();
      Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
      txObject.setPreviousIsolationLevel(previousIsolationLevel);
      // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
      // so we don't want to do it unnecessarily (for example if we've explicitly
      // configured the connection pool to set it already).
      if (con.getAutoCommit()) {
        txObject.setMustRestoreAutoCommit(true);
        if (logger.isDebugEnabled()) {
          logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
        }
        //启用事务,关闭自动提交
        con.setAutoCommit(false);
      }
      prepareTransactionalConnection(con, definition);
      txObject.getConnectionHolder().setTransactionActive(true);
      int timeout = determineTimeout(definition);
      if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
        txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
      }
      // 以DataSource为Key,通过ThreadLocal将ConnectionHolder绑定到当前线程
      if (txObject.isNewConnectionHolder()) {
        TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
      }
    }
    catch (Throwable ex) {
      if (txObject.isNewConnectionHolder()) {
        DataSourceUtils.releaseConnection(con, this.dataSource);
        txObject.setConnectionHolder(null, false);
      }
      throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
  }  
    //启用事务同步  
    protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
    if (status.isNewSynchronization()) {
      TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
      TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
          definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
              definition.getIsolationLevel() : null);
      TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
      TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
      //启用事务同步,在Mybatis中会依据事务同步判断是否启用事务
      TransactionSynchronizationManager.initSynchronization();
    }
  }  
}
//Spring通过TransactionSynchronizationManager来将事务同步信息绑定到线程
public static void initSynchronization() throws IllegalStateException {
    if (isSynchronizationActive()) {
      throw new IllegalStateException("Cannot activate transaction synchronization - already active");
    }
    logger.trace("Initializing transaction synchronization");
    //本质上是一个ThreadLocal
    synchronizations.set(new LinkedHashSet<TransactionSynchronization>());
  }
//mybatis
private static void registerSessionHolder(SqlSessionFactory sessionFactory, ExecutorType executorType,
      PersistenceExceptionTranslator exceptionTranslator, SqlSession session) {
    SqlSessionHolder holder;
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
        //...
        holder = new SqlSessionHolder(session, executorType, exceptionTranslator);
        //同一个线程中mybatis和spring事务关联
        TransactionSynchronizationManager.bindResource(sessionFactory, holder);
        TransactionSynchronizationManager.registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));
        holder.setSynchronizedWithTransaction(true);
        holder.requested();
      } 
    //...
}


总结



Spring中启用事务后,会将数据库的连接对象通过ThreadLocal绑定到当前线程,以便后续同一个线程拿到的是同一个连接对象;同时会通过TransactionSynchronizationManager启用事务同步,便于后续Mybatis判断是否启用事务。

目录
相关文章
|
30天前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
1月前
|
监控 Java 应用服务中间件
Spring和Spring Boot的区别
Spring和Spring Boot的主要区别,包括项目配置、开发模式、项目依赖、内嵌服务器和监控管理等方面,强调Spring Boot基于Spring框架,通过约定优于配置、自动配置和快速启动器等特性,简化了Spring应用的开发和部署过程。
52 19
|
1月前
|
Java 测试技术 开发者
springboot学习四:Spring Boot profile多环境配置、devtools热部署
这篇文章主要介绍了如何在Spring Boot中进行多环境配置以及如何整合DevTools实现热部署,以提高开发效率。
57 2
|
1月前
|
前端开发 Java 程序员
springboot 学习十五:Spring Boot 优雅的集成Swagger2、Knife4j
这篇文章是关于如何在Spring Boot项目中集成Swagger2和Knife4j来生成和美化API接口文档的详细教程。
90 1
|
1月前
|
Java API Spring
springboot学习七:Spring Boot2.x 拦截器基础入门&实战项目场景实现
这篇文章是关于Spring Boot 2.x中拦截器的入门教程和实战项目场景实现的详细指南。
26 0
springboot学习七:Spring Boot2.x 拦截器基础入门&实战项目场景实现
|
1月前
|
Java API Spring
springboot学习六:Spring Boot2.x 过滤器基础入门&实战项目场景实现
这篇文章是关于Spring Boot 2.x中过滤器的基础知识和实战项目应用的教程。
24 0
springboot学习六:Spring Boot2.x 过滤器基础入门&实战项目场景实现
|
1月前
|
Java 关系型数据库 MySQL
springboot学习五:springboot整合Mybatis 连接 mysql数据库
这篇文章是关于如何使用Spring Boot整合MyBatis来连接MySQL数据库,并进行基本的增删改查操作的教程。
70 0
springboot学习五:springboot整合Mybatis 连接 mysql数据库
|
1月前
|
Java 数据库连接 API
springBoot:后端解决跨域&Mybatis-Plus&SwaggerUI&代码生成器 (四)
本文介绍了后端解决跨域问题的方法及Mybatis-Plus的配置与使用。首先通过创建`CorsConfig`类并设置相关参数来实现跨域请求处理。接着,详细描述了如何引入Mybatis-Plus插件,包括配置`MybatisPlusConfig`类、定义Mapper接口以及Service层。此外,还展示了如何配置分页查询功能,并引入SwaggerUI进行API文档生成。最后,提供了代码生成器的配置示例,帮助快速生成项目所需的基础代码。
|
1月前
|
XML Java 应用服务中间件
【Spring】运行Spring Boot项目,请求响应流程分析以及404和500报错
【Spring】运行Spring Boot项目,请求响应流程分析以及404和500报错
168 2
|
1月前
|
前端开发 安全 Java
【Spring】Spring Boot项目创建和目录介绍
【Spring】Spring Boot项目创建和目录介绍
84 2