Spring是如何保证同一事务获取同一个Connection的?使用Spring的事务同步机制解决:数据库刚插入的记录却查询不到的问题【享学Spring】(中)

简介: Spring是如何保证同一事务获取同一个Connection的?使用Spring的事务同步机制解决:数据库刚插入的记录却查询不到的问题【享学Spring】(中)

TransactionSynchronizationManager


对它简单的解释为:使用TreadLocal记录事务的一些属性,用于应用扩展同步器的使用,在事务的开启,挂起,提交等各个点上回调应用的逻辑


// @since 02.06.2003  它是个抽象类,但是没有任何子类  因为它所有的方法都是静态的
public abstract class TransactionSynchronizationManager {
  // ======保存着一大堆的ThreadLocal 这里就是它的核心存储======
  //  应用代码随事务的声明周期绑定的对象  比如:DataSourceTransactionManager有这么做:
  //TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
  // TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
  // 简单理解为当前线程的数据存储中心~~~~
  private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");
  // 使用的同步器,用于应用扩展
  // TransactionSynchronization同步器是最为重要的一个扩展点~~~ 这里是个set 所以每个线程都可以注册N多个同步器
  private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<>("Transaction synchronizations");
  // 事务的名称  
  private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<>("Current transaction name");
  // 事务是否是只读  
  private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<>("Current transaction read-only status");
  // 事务的隔离级别
  private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<>("Current transaction isolation level");
  // 事务是否开启   actual:真实的
  private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<>("Actual transaction active");
  // 返回的是个只读视图
  public static Map<Object, Object> getResourceMap() {
    Map<Object, Object> map = resources.get();
    return (map != null ? Collections.unmodifiableMap(map) : Collections.emptyMap());
  }
  public static boolean hasResource(Object key) { ... }
  public static Object getResource(Object key) { ... }
  // actualKey:确定的key  拆包后的
  @Nullable
  private static Object doGetResource(Object actualKey) {
    Map<Object, Object> map = resources.get();
    if (map == null) {
      return null;
    }
    Object value = map.get(actualKey);
    // Transparently remove ResourceHolder that was marked as void...
    // 如果ResourceHolder 被标记为了void空白了。此处直接从map里移除掉对应的key 
    // ~~~~~~~并且返回null~~~~~~~~~~~
    if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
      map.remove(actualKey);
      // Remove entire ThreadLocal if empty...
      if (map.isEmpty()) {
        resources.remove();
      }
      value = null;
    }
    return value;
  }
  // 逻辑很简单,就是和当前线程绑定一个Map,并且处理ResourceHolder 如果isVoid就抛错
  public static void bindResource(Object key, Object value) throws IllegalStateException {
    Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    Assert.notNull(value, "Value must not be null");
    Map<Object, Object> map = resources.get();
    // set ThreadLocal Map if none found
    if (map == null) {
      map = new HashMap<>();
      resources.set(map);
    }
    Object oldValue = map.put(actualKey, value);
    // Transparently suppress a ResourceHolder that was marked as void...
    if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
      oldValue = null;
    }
    if (oldValue != null) {
      throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
          actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
    }
  }
  public static Object unbindResource(Object key) throws IllegalStateException { ... }
  public static Object unbindResourceIfPossible(Object key) { ... }
  // 同步器是否是激活状态~~~  若是激活状态就可以执行同步器里的相关回调方法了
  public static boolean isSynchronizationActive() {
    return (synchronizations.get() != null);
  }
  // 如果事务已经开启了,就不能再初始化同步器了  而是直接注册
  public static void initSynchronization() throws IllegalStateException {
    if (isSynchronizationActive()) {
      throw new IllegalStateException("Cannot activate transaction synchronization - already active");
    }
    logger.trace("Initializing transaction synchronization");
    synchronizations.set(new LinkedHashSet<>());
  }
  // 注册同步器TransactionSynchronization   这个非常重要 下面有详细介绍这个接口
  // 注册的时候要求当前线程的事务已经是激活状态的  而不是随便就可以调用的哦~~~
  public static void registerSynchronization(TransactionSynchronization synchronization) throws IllegalStateException {
    Assert.notNull(synchronization, "TransactionSynchronization must not be null");
    if (!isSynchronizationActive()) {
      throw new IllegalStateException("Transaction synchronization is not active");
    }
    synchronizations.get().add(synchronization);
  }
  // 返回的是只读视图  并且,并且支持AnnotationAwareOrderComparator.sort(sortedSynchs); 这样排序~~
  public static List<TransactionSynchronization> getSynchronizations() throws IllegalStateException { ... }
  public static void clearSynchronization() throws IllegalStateException { ... }
  ... // 省略name等其余几个属性的get/set方法  因为没有任何逻辑
  // 这个方法列出来,应该下面会解释
  public static void setActualTransactionActive(boolean active) {
    actualTransactionActive.set(active ? Boolean.TRUE : null);
  }
  // 清楚所有和当前线程相关的(注意:此处只是clear清除,和当前线程的绑定而已~~~)
  public static void clear() {
    synchronizations.remove();
    currentTransactionName.remove();
    currentTransactionReadOnly.remove();
    currentTransactionIsolationLevel.remove();
    actualTransactionActive.remove();
  }
}


这里把setActualTransactionActive单独拿出来看一下,以加深对事务执行过程的了解。


TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction()); // 相当于表示事务为开启了


并且该类的handleExistingTransaction、prepareTransactionStatus等等方法都会此标记有调用,也就是说它会参与到事务的声明周期里面去


备注:以上方法他们统一的判断条件有:TransactionStatus.isNewTransaction()是新事务的时候才会调用这个方进行标记


另外此类它的suspend暂停的时候会直接的这么调用:

TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
TransactionSynchronizationManager.setActualTransactionActive(false);


resume恢复的时候:

TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);


大体上可以得出这样的一个处理步骤:


  1. 开启新的事务时初始化。第一次开启事务分为:real首次 或 已存在事务但是REQUIRES_NEW
  2. 在事务的嵌套过程中,TransactionSynchronizationManager属性不断更新最终清除。即外层事务挂起;事务提交,这两个点需要更新TransactionSynchronizationManager属性。
  3. 这里面有个内部类AbstractPlatformTransactionManager.SuspendedResourcesHolder它是负责事务挂起时候,保存事物属性的对象,用于恢复外层事务。当恢复外层事务时,根据SuspendedResourcesHolder对象,调用底层事务框架恢复事务属性,并恢复TransactionSynchronizationManager


DefaultTransactionStatus


它实现了TransactionStatus接口。

这个是整个事务框架最重要的状态对象,它贯穿于事务拦截器,spring抽象框架和底层具体事务实现框架之间。


它的重要任务是在新建,挂起,提交事务的过程中保存对应事务的属性。在AbstractPlatformTransactionManager中,每个事物流程都会new创建这个对象


TransactionSynchronizationUtils


这个工具类比较简单,主要是处理TransactionSynchronizationManager和执行TransactionSynchronization它对应的方法们,略~

TransactionSynchronization:事务同步器

这个类非常的重要,它是我们程序员对事务同步的扩展点:用于事务同步回调的接口,AbstractPlatformTransactionManager支持它。


注意:自定义的同步器可以通过实现Ordered接口来自己定制化顺序,若没实现接口就按照添加的顺序执行~

// @since 02.06.2003  实现了java.io.Flushable接口
public interface TransactionSynchronization extends Flushable {
  int STATUS_COMMITTED = 0;
  int STATUS_ROLLED_BACK = 1;
  int STATUS_UNKNOWN = 2;
  // 事务赞提suspend的时候调用此方法
  // 实现这个方法的目的一般是释放掉绑定的resources 
  // TransactionSynchronizationManager#unbindResource
  default void suspend() {
  }
  // 事务恢复时候调用
  // TransactionSynchronizationManager#bindResource
  default void resume() {
  }
  // 将基础会话刷新到数据存储区(如果适用) 比如Hibernate/Jpa的session
  @Override
  default void flush() {
  }
  // 在事务提交之前促发。在AbstractPlatformTransactionManager.processCommit方法里 commit之前触发
  // 事务提交之前,比如flushing SQL statements to the database
  // 请注意:若此处发生了异常,会导致回滚~
  default void beforeCommit(boolean readOnly) {
  }
  // 在beforeCommit之后,在commit/rollback之前执行
  // 它和beforeCommit还有个非常大的区别是:即使beforeCommit抛出异常了  这个也会执行
  default void beforeCompletion() {
  }
  // 这个就非常重要了,它是事务提交(注意事务已经成功提交,数据库已经持久化完成这条数据了)后执行  注意此处是成功提交而没有异常
  // javadoc说了:此处一般可以发短信或者email等操作~~因为事务已经成功提交了
  // =====但是但是但是:======
  // 事务虽然已经提交,但事务资源(链接connection)可能仍然是活动的和可访问的。
  // 因此,此时触发的任何数据访问代码仍将“参与”原始事务 允许执行一些清理(不再执行提交操作!)
  // 除非它明确声明它需要在单独的事务中运行。
  default void afterCommit() {
  }
  // 和上面的区别在于:即使抛出异常回滚了  它也会执行的。它的notice同上
  default void afterCompletion(int status) {
  }
}


我们自定义一个同步器TransactionSynchronization使用得最多的是afterCommit和afterCompletion这两个方法,但是上面的note一定一定要注意,下面我用“人的语言”尝试翻译如下:


  1. 事务虽然已经提交,但是我的连接可能还是活动的(比如使用了连接池链接是不会关闭的)
  2. 若你的回调中刚好又使用到了这个链接,它会参与到原始的事务里面去
  3. 这个时候你参与到了原始事务,但是它并不会给你commit提交。(所以你在这里做的update、insert等默认都将不好使)
  4. 回收资源(链接)的时候,因为你使用的就是原始事务的资源,所以Spring事务还会给你回收掉,从而就可能导致你的程序出错


声明一下:这段白话文"翻译"是我自主的行为,目前还没有得到任何官方、第三方的描述和认可,我是第一个,旨在希望对小伙伴理解这块有所帮助,若有不对的地方请帮忙留言指正,不甚感激~


依旧为了加强理解,看看源码处是怎么个逻辑:


public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
  ...
  private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    ...
    try {
      prepareForCommit(status);
      triggerBeforeCommit(status);
      triggerBeforeCompletion(status);
      ...
      doCommit(status);
      // 事务正常提交后  当然triggerAfterCompletion方法上面回滚里有而有个执行 此处不贴出了
      try {
        triggerAfterCommit(status);
      } finally {
        triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
      }
    } finally {
      cleanupAfterCompletion(status);
    }
  }
  ...
  // 清楚、回收事务相关的资源~~~  并且恢复底层事务(若需要~)
  private void cleanupAfterCompletion(DefaultTransactionStatus status) {
    status.setCompleted();
    if (status.isNewSynchronization()) {
      TransactionSynchronizationManager.clear();
    }
    if (status.isNewTransaction()) {
      doCleanupAfterCompletion(status.getTransaction());
    }
    if (status.getSuspendedResources() != null) {
      if (status.isDebug()) {
        logger.debug("Resuming suspended transaction after completion of inner transaction");
      }
      Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
      resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
    }
  }
}


从这个代码结构里可以看到,即使triggerAfterCommit和triggerAfterCompletion全部都执行了(哪怕是抛错了),最终它一定会做的是:cleanupAfterCompletion(status);这一步会回收资源。


那这种情况怎么避免被它回收呢?其实上面JavaDoc也说了:首先是可能,其次Spring建议使用一个新事务处理来避免这种可能性发生


至于什么是新事务?比如上面的new了一个线程,那都别说新事务了,都开新线程,所以肯定是不存在此问题了的。

Spring这里指的是若你还在同一个线程里,同步进行处理的时候,建议新启一个新事务(使用PROPAGATION_REQUIRES_NEW吧~)

相关文章
|
2天前
|
SQL Java 关系型数据库
【SpringFramework】Spring事务
本文简述Spring中数据库及事务相关衍伸知识点。
26 9
|
9天前
|
Java 开发者 Spring
理解和解决Spring框架中的事务自调用问题
事务自调用问题是由于 Spring AOP 代理机制引起的,当方法在同一个类内部自调用时,事务注解将失效。通过使用代理对象调用、将事务逻辑分离到不同类中或使用 AspectJ 模式,可以有效解决这一问题。理解和解决这一问题,对于保证 Spring 应用中的事务管理正确性至关重要。掌握这些技巧,可以提高开发效率和代码的健壮性。
38 13
|
1月前
|
缓存 安全 Java
Spring高手之路26——全方位掌握事务监听器
本文深入探讨了Spring事务监听器的设计与实现,包括通过TransactionSynchronization接口和@TransactionalEventListener注解实现事务监听器的方法,并通过实例详细展示了如何在事务生命周期的不同阶段执行自定义逻辑,提供了实际应用场景中的最佳实践。
47 2
Spring高手之路26——全方位掌握事务监听器
|
1月前
|
Java 关系型数据库 数据库
京东面试:聊聊Spring事务?Spring事务的10种失效场景?加入型传播和嵌套型传播有什么区别?
45岁老架构师尼恩分享了Spring事务的核心知识点,包括事务的两种管理方式(编程式和声明式)、@Transactional注解的五大属性(transactionManager、propagation、isolation、timeout、readOnly、rollbackFor)、事务的七种传播行为、事务隔离级别及其与数据库隔离级别的关系,以及Spring事务的10种失效场景。尼恩还强调了面试中如何给出高质量答案,推荐阅读《尼恩Java面试宝典PDF》以提升面试表现。更多技术资料可在公众号【技术自由圈】获取。
|
12天前
|
SQL 存储 Java
数据库———事务及bug的解决
事务的一些概念,并发事务以及并发事务引起的bug,脏读,不可重复读,幻读,数据库中的隔离级别,事务的简单应用
|
2月前
|
Java 关系型数据库 数据库连接
使用 Spring Boot 执行数据库操作:全面指南
使用 Spring Boot 执行数据库操作:全面指南
157 1
|
2月前
|
JavaScript Java 关系型数据库
Spring事务失效的8种场景
本文总结了使用 @Transactional 注解时事务可能失效的几种情况,包括数据库引擎不支持事务、类未被 Spring 管理、方法非 public、自身调用、未配置事务管理器、设置为不支持事务、异常未抛出及异常类型不匹配等。针对这些情况,文章提供了相应的解决建议,帮助开发者排查和解决事务不生效的问题。
|
3月前
|
人工智能 自然语言处理 前端开发
SpringBoot + 通义千问 + 自定义React组件:支持EventStream数据解析的技术实践
【10月更文挑战第7天】在现代Web开发中,集成多种技术栈以实现复杂的功能需求已成为常态。本文将详细介绍如何使用SpringBoot作为后端框架,结合阿里巴巴的通义千问(一个强大的自然语言处理服务),并通过自定义React组件来支持服务器发送事件(SSE, Server-Sent Events)的EventStream数据解析。这一组合不仅能够实现高效的实时通信,还能利用AI技术提升用户体验。
261 2
|
13天前
|
Java 数据库连接 Maven
最新版 | 深入剖析SpringBoot3源码——分析自动装配原理(面试常考)
自动装配是现在面试中常考的一道面试题。本文基于最新的 SpringBoot 3.3.3 版本的源码来分析自动装配的原理,并在文未说明了SpringBoot2和SpringBoot3的自动装配源码中区别,以及面试回答的拿分核心话术。
最新版 | 深入剖析SpringBoot3源码——分析自动装配原理(面试常考)
|
20天前
|
NoSQL Java Redis
Spring Boot 自动配置机制:从原理到自定义
Spring Boot 的自动配置机制通过 `spring.factories` 文件和 `@EnableAutoConfiguration` 注解,根据类路径中的依赖和条件注解自动配置所需的 Bean,大大简化了开发过程。本文深入探讨了自动配置的原理、条件化配置、自定义自动配置以及实际应用案例,帮助开发者更好地理解和利用这一强大特性。
75 14

热门文章

最新文章