Spring是如何保证同一事务获取同一个Connection的?使用Spring的事务同步机制解决:数据库刚插入的记录却查询不到的问题【享学Spring】(中)

简介: Spring是如何保证同一事务获取同一个Connection的?使用Spring的事务同步机制解决:数据库刚插入的记录却查询不到的问题【享学Spring】(中)

TransactionSynchronizationManager


对它简单的解释为:使用TreadLocal记录事务的一些属性,用于应用扩展同步器的使用,在事务的开启,挂起,提交等各个点上回调应用的逻辑


// @since 02.06.2003  它是个抽象类,但是没有任何子类  因为它所有的方法都是静态的
public abstract class TransactionSynchronizationManager {
  // ======保存着一大堆的ThreadLocal 这里就是它的核心存储======
  //  应用代码随事务的声明周期绑定的对象  比如:DataSourceTransactionManager有这么做:
  //TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
  // TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
  // 简单理解为当前线程的数据存储中心~~~~
  private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");
  // 使用的同步器,用于应用扩展
  // TransactionSynchronization同步器是最为重要的一个扩展点~~~ 这里是个set 所以每个线程都可以注册N多个同步器
  private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<>("Transaction synchronizations");
  // 事务的名称  
  private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<>("Current transaction name");
  // 事务是否是只读  
  private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<>("Current transaction read-only status");
  // 事务的隔离级别
  private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<>("Current transaction isolation level");
  // 事务是否开启   actual:真实的
  private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<>("Actual transaction active");
  // 返回的是个只读视图
  public static Map<Object, Object> getResourceMap() {
    Map<Object, Object> map = resources.get();
    return (map != null ? Collections.unmodifiableMap(map) : Collections.emptyMap());
  }
  public static boolean hasResource(Object key) { ... }
  public static Object getResource(Object key) { ... }
  // actualKey:确定的key  拆包后的
  @Nullable
  private static Object doGetResource(Object actualKey) {
    Map<Object, Object> map = resources.get();
    if (map == null) {
      return null;
    }
    Object value = map.get(actualKey);
    // Transparently remove ResourceHolder that was marked as void...
    // 如果ResourceHolder 被标记为了void空白了。此处直接从map里移除掉对应的key 
    // ~~~~~~~并且返回null~~~~~~~~~~~
    if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
      map.remove(actualKey);
      // Remove entire ThreadLocal if empty...
      if (map.isEmpty()) {
        resources.remove();
      }
      value = null;
    }
    return value;
  }
  // 逻辑很简单,就是和当前线程绑定一个Map,并且处理ResourceHolder 如果isVoid就抛错
  public static void bindResource(Object key, Object value) throws IllegalStateException {
    Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    Assert.notNull(value, "Value must not be null");
    Map<Object, Object> map = resources.get();
    // set ThreadLocal Map if none found
    if (map == null) {
      map = new HashMap<>();
      resources.set(map);
    }
    Object oldValue = map.put(actualKey, value);
    // Transparently suppress a ResourceHolder that was marked as void...
    if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
      oldValue = null;
    }
    if (oldValue != null) {
      throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
          actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
    }
  }
  public static Object unbindResource(Object key) throws IllegalStateException { ... }
  public static Object unbindResourceIfPossible(Object key) { ... }
  // 同步器是否是激活状态~~~  若是激活状态就可以执行同步器里的相关回调方法了
  public static boolean isSynchronizationActive() {
    return (synchronizations.get() != null);
  }
  // 如果事务已经开启了,就不能再初始化同步器了  而是直接注册
  public static void initSynchronization() throws IllegalStateException {
    if (isSynchronizationActive()) {
      throw new IllegalStateException("Cannot activate transaction synchronization - already active");
    }
    logger.trace("Initializing transaction synchronization");
    synchronizations.set(new LinkedHashSet<>());
  }
  // 注册同步器TransactionSynchronization   这个非常重要 下面有详细介绍这个接口
  // 注册的时候要求当前线程的事务已经是激活状态的  而不是随便就可以调用的哦~~~
  public static void registerSynchronization(TransactionSynchronization synchronization) throws IllegalStateException {
    Assert.notNull(synchronization, "TransactionSynchronization must not be null");
    if (!isSynchronizationActive()) {
      throw new IllegalStateException("Transaction synchronization is not active");
    }
    synchronizations.get().add(synchronization);
  }
  // 返回的是只读视图  并且,并且支持AnnotationAwareOrderComparator.sort(sortedSynchs); 这样排序~~
  public static List<TransactionSynchronization> getSynchronizations() throws IllegalStateException { ... }
  public static void clearSynchronization() throws IllegalStateException { ... }
  ... // 省略name等其余几个属性的get/set方法  因为没有任何逻辑
  // 这个方法列出来,应该下面会解释
  public static void setActualTransactionActive(boolean active) {
    actualTransactionActive.set(active ? Boolean.TRUE : null);
  }
  // 清楚所有和当前线程相关的(注意:此处只是clear清除,和当前线程的绑定而已~~~)
  public static void clear() {
    synchronizations.remove();
    currentTransactionName.remove();
    currentTransactionReadOnly.remove();
    currentTransactionIsolationLevel.remove();
    actualTransactionActive.remove();
  }
}


这里把setActualTransactionActive单独拿出来看一下,以加深对事务执行过程的了解。


TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction()); // 相当于表示事务为开启了


并且该类的handleExistingTransaction、prepareTransactionStatus等等方法都会此标记有调用,也就是说它会参与到事务的声明周期里面去


备注:以上方法他们统一的判断条件有:TransactionStatus.isNewTransaction()是新事务的时候才会调用这个方进行标记


另外此类它的suspend暂停的时候会直接的这么调用:

TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
TransactionSynchronizationManager.setActualTransactionActive(false);


resume恢复的时候:

TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);


大体上可以得出这样的一个处理步骤:


  1. 开启新的事务时初始化。第一次开启事务分为:real首次 或 已存在事务但是REQUIRES_NEW
  2. 在事务的嵌套过程中,TransactionSynchronizationManager属性不断更新最终清除。即外层事务挂起;事务提交,这两个点需要更新TransactionSynchronizationManager属性。
  3. 这里面有个内部类AbstractPlatformTransactionManager.SuspendedResourcesHolder它是负责事务挂起时候,保存事物属性的对象,用于恢复外层事务。当恢复外层事务时,根据SuspendedResourcesHolder对象,调用底层事务框架恢复事务属性,并恢复TransactionSynchronizationManager


DefaultTransactionStatus


它实现了TransactionStatus接口。

这个是整个事务框架最重要的状态对象,它贯穿于事务拦截器,spring抽象框架和底层具体事务实现框架之间。


它的重要任务是在新建,挂起,提交事务的过程中保存对应事务的属性。在AbstractPlatformTransactionManager中,每个事物流程都会new创建这个对象


TransactionSynchronizationUtils


这个工具类比较简单,主要是处理TransactionSynchronizationManager和执行TransactionSynchronization它对应的方法们,略~

TransactionSynchronization:事务同步器

这个类非常的重要,它是我们程序员对事务同步的扩展点:用于事务同步回调的接口,AbstractPlatformTransactionManager支持它。


注意:自定义的同步器可以通过实现Ordered接口来自己定制化顺序,若没实现接口就按照添加的顺序执行~

// @since 02.06.2003  实现了java.io.Flushable接口
public interface TransactionSynchronization extends Flushable {
  int STATUS_COMMITTED = 0;
  int STATUS_ROLLED_BACK = 1;
  int STATUS_UNKNOWN = 2;
  // 事务赞提suspend的时候调用此方法
  // 实现这个方法的目的一般是释放掉绑定的resources 
  // TransactionSynchronizationManager#unbindResource
  default void suspend() {
  }
  // 事务恢复时候调用
  // TransactionSynchronizationManager#bindResource
  default void resume() {
  }
  // 将基础会话刷新到数据存储区(如果适用) 比如Hibernate/Jpa的session
  @Override
  default void flush() {
  }
  // 在事务提交之前促发。在AbstractPlatformTransactionManager.processCommit方法里 commit之前触发
  // 事务提交之前,比如flushing SQL statements to the database
  // 请注意:若此处发生了异常,会导致回滚~
  default void beforeCommit(boolean readOnly) {
  }
  // 在beforeCommit之后,在commit/rollback之前执行
  // 它和beforeCommit还有个非常大的区别是:即使beforeCommit抛出异常了  这个也会执行
  default void beforeCompletion() {
  }
  // 这个就非常重要了,它是事务提交(注意事务已经成功提交,数据库已经持久化完成这条数据了)后执行  注意此处是成功提交而没有异常
  // javadoc说了:此处一般可以发短信或者email等操作~~因为事务已经成功提交了
  // =====但是但是但是:======
  // 事务虽然已经提交,但事务资源(链接connection)可能仍然是活动的和可访问的。
  // 因此,此时触发的任何数据访问代码仍将“参与”原始事务 允许执行一些清理(不再执行提交操作!)
  // 除非它明确声明它需要在单独的事务中运行。
  default void afterCommit() {
  }
  // 和上面的区别在于:即使抛出异常回滚了  它也会执行的。它的notice同上
  default void afterCompletion(int status) {
  }
}


我们自定义一个同步器TransactionSynchronization使用得最多的是afterCommit和afterCompletion这两个方法,但是上面的note一定一定要注意,下面我用“人的语言”尝试翻译如下:


  1. 事务虽然已经提交,但是我的连接可能还是活动的(比如使用了连接池链接是不会关闭的)
  2. 若你的回调中刚好又使用到了这个链接,它会参与到原始的事务里面去
  3. 这个时候你参与到了原始事务,但是它并不会给你commit提交。(所以你在这里做的update、insert等默认都将不好使)
  4. 回收资源(链接)的时候,因为你使用的就是原始事务的资源,所以Spring事务还会给你回收掉,从而就可能导致你的程序出错


声明一下:这段白话文"翻译"是我自主的行为,目前还没有得到任何官方、第三方的描述和认可,我是第一个,旨在希望对小伙伴理解这块有所帮助,若有不对的地方请帮忙留言指正,不甚感激~


依旧为了加强理解,看看源码处是怎么个逻辑:


public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
  ...
  private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    ...
    try {
      prepareForCommit(status);
      triggerBeforeCommit(status);
      triggerBeforeCompletion(status);
      ...
      doCommit(status);
      // 事务正常提交后  当然triggerAfterCompletion方法上面回滚里有而有个执行 此处不贴出了
      try {
        triggerAfterCommit(status);
      } finally {
        triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
      }
    } finally {
      cleanupAfterCompletion(status);
    }
  }
  ...
  // 清楚、回收事务相关的资源~~~  并且恢复底层事务(若需要~)
  private void cleanupAfterCompletion(DefaultTransactionStatus status) {
    status.setCompleted();
    if (status.isNewSynchronization()) {
      TransactionSynchronizationManager.clear();
    }
    if (status.isNewTransaction()) {
      doCleanupAfterCompletion(status.getTransaction());
    }
    if (status.getSuspendedResources() != null) {
      if (status.isDebug()) {
        logger.debug("Resuming suspended transaction after completion of inner transaction");
      }
      Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
      resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
    }
  }
}


从这个代码结构里可以看到,即使triggerAfterCommit和triggerAfterCompletion全部都执行了(哪怕是抛错了),最终它一定会做的是:cleanupAfterCompletion(status);这一步会回收资源。


那这种情况怎么避免被它回收呢?其实上面JavaDoc也说了:首先是可能,其次Spring建议使用一个新事务处理来避免这种可能性发生


至于什么是新事务?比如上面的new了一个线程,那都别说新事务了,都开新线程,所以肯定是不存在此问题了的。

Spring这里指的是若你还在同一个线程里,同步进行处理的时候,建议新启一个新事务(使用PROPAGATION_REQUIRES_NEW吧~)

相关文章
|
19天前
|
缓存 关系型数据库 MySQL
高并发架构系列:数据库主从同步的 3 种方案
本文详解高并发场景下数据库主从同步的三种解决方案:数据主从同步、数据库半同步复制、数据库中间件同步和缓存记录写key同步,旨在帮助解决数据一致性问题。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
高并发架构系列:数据库主从同步的 3 种方案
|
26天前
|
数据库
什么是数据库的事务隔离级别,有什么作用
【10月更文挑战第21】什么是数据库的事务隔离级别,有什么作用
13 3
|
26天前
|
存储 关系型数据库 数据挖掘
什么是数据库的事务隔离级别
【10月更文挑战第21】什么是数据库的事务隔离级别
18 1
|
28天前
|
算法 大数据 数据库
云计算与大数据平台的数据库迁移与同步
本文详细介绍了云计算与大数据平台的数据库迁移与同步的核心概念、算法原理、具体操作步骤、数学模型公式、代码实例及未来发展趋势与挑战。涵盖全量与增量迁移、一致性与异步复制等内容,旨在帮助读者全面了解并应对相关技术挑战。
35 3
|
28天前
|
SQL Java 数据库
Spring Boot与Flyway:数据库版本控制的自动化实践
【10月更文挑战第19天】 在软件开发中,数据库的版本控制是一个至关重要的环节,它确保了数据库结构的一致性和项目的顺利迭代。Spring Boot结合Flyway提供了一种自动化的数据库版本控制解决方案,极大地简化了数据库迁移管理。本文将详细介绍如何使用Spring Boot和Flyway实现数据库版本的自动化控制。
26 2
|
1月前
|
存储 数据库 数据库管理
数据库事务安全性控制如何实现呢
【10月更文挑战第15天】数据库事务安全性控制如何实现呢
|
1月前
|
存储 数据库 数据库管理
什么是数据库事务安全性控制
【10月更文挑战第15天】什么是数据库事务安全性控制
|
1月前
|
供应链 数据库
数据库事务安全性控制有什么应用场景吗
【10月更文挑战第15天】数据库事务安全性控制有什么应用场景吗
|
11天前
|
SQL 关系型数据库 MySQL
12 PHP配置数据库MySQL
路老师分享了PHP操作MySQL数据库的方法,包括安装并连接MySQL服务器、选择数据库、执行SQL语句(如插入、更新、删除和查询),以及将结果集返回到数组。通过具体示例代码,详细介绍了每一步的操作流程,帮助读者快速入门PHP与MySQL的交互。
26 1
|
13天前
|
SQL 关系型数据库 MySQL
go语言数据库中mysql驱动安装
【11月更文挑战第2天】
29 4
下一篇
无影云桌面