TransactionSynchronizationManager
对它简单的解释为:使用TreadLocal
记录事务的一些属性,用于应用扩展同步器的使用,在事务的开启,挂起,提交等各个点上回调应用的逻辑
// @since 02.06.2003 它是个抽象类,但是没有任何子类 因为它所有的方法都是静态的 public abstract class TransactionSynchronizationManager { // ======保存着一大堆的ThreadLocal 这里就是它的核心存储====== // 应用代码随事务的声明周期绑定的对象 比如:DataSourceTransactionManager有这么做: //TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); // TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources); // 简单理解为当前线程的数据存储中心~~~~ private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources"); // 使用的同步器,用于应用扩展 // TransactionSynchronization同步器是最为重要的一个扩展点~~~ 这里是个set 所以每个线程都可以注册N多个同步器 private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<>("Transaction synchronizations"); // 事务的名称 private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<>("Current transaction name"); // 事务是否是只读 private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<>("Current transaction read-only status"); // 事务的隔离级别 private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<>("Current transaction isolation level"); // 事务是否开启 actual:真实的 private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<>("Actual transaction active"); // 返回的是个只读视图 public static Map<Object, Object> getResourceMap() { Map<Object, Object> map = resources.get(); return (map != null ? Collections.unmodifiableMap(map) : Collections.emptyMap()); } public static boolean hasResource(Object key) { ... } public static Object getResource(Object key) { ... } // actualKey:确定的key 拆包后的 @Nullable private static Object doGetResource(Object actualKey) { Map<Object, Object> map = resources.get(); if (map == null) { return null; } Object value = map.get(actualKey); // Transparently remove ResourceHolder that was marked as void... // 如果ResourceHolder 被标记为了void空白了。此处直接从map里移除掉对应的key // ~~~~~~~并且返回null~~~~~~~~~~~ if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { map.remove(actualKey); // Remove entire ThreadLocal if empty... if (map.isEmpty()) { resources.remove(); } value = null; } return value; } // 逻辑很简单,就是和当前线程绑定一个Map,并且处理ResourceHolder 如果isVoid就抛错 public static void bindResource(Object key, Object value) throws IllegalStateException { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Assert.notNull(value, "Value must not be null"); Map<Object, Object> map = resources.get(); // set ThreadLocal Map if none found if (map == null) { map = new HashMap<>(); resources.set(map); } Object oldValue = map.put(actualKey, value); // Transparently suppress a ResourceHolder that was marked as void... if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) { oldValue = null; } if (oldValue != null) { throw new IllegalStateException("Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } } public static Object unbindResource(Object key) throws IllegalStateException { ... } public static Object unbindResourceIfPossible(Object key) { ... } // 同步器是否是激活状态~~~ 若是激活状态就可以执行同步器里的相关回调方法了 public static boolean isSynchronizationActive() { return (synchronizations.get() != null); } // 如果事务已经开启了,就不能再初始化同步器了 而是直接注册 public static void initSynchronization() throws IllegalStateException { if (isSynchronizationActive()) { throw new IllegalStateException("Cannot activate transaction synchronization - already active"); } logger.trace("Initializing transaction synchronization"); synchronizations.set(new LinkedHashSet<>()); } // 注册同步器TransactionSynchronization 这个非常重要 下面有详细介绍这个接口 // 注册的时候要求当前线程的事务已经是激活状态的 而不是随便就可以调用的哦~~~ public static void registerSynchronization(TransactionSynchronization synchronization) throws IllegalStateException { Assert.notNull(synchronization, "TransactionSynchronization must not be null"); if (!isSynchronizationActive()) { throw new IllegalStateException("Transaction synchronization is not active"); } synchronizations.get().add(synchronization); } // 返回的是只读视图 并且,并且支持AnnotationAwareOrderComparator.sort(sortedSynchs); 这样排序~~ public static List<TransactionSynchronization> getSynchronizations() throws IllegalStateException { ... } public static void clearSynchronization() throws IllegalStateException { ... } ... // 省略name等其余几个属性的get/set方法 因为没有任何逻辑 // 这个方法列出来,应该下面会解释 public static void setActualTransactionActive(boolean active) { actualTransactionActive.set(active ? Boolean.TRUE : null); } // 清楚所有和当前线程相关的(注意:此处只是clear清除,和当前线程的绑定而已~~~) public static void clear() { synchronizations.remove(); currentTransactionName.remove(); currentTransactionReadOnly.remove(); currentTransactionIsolationLevel.remove(); actualTransactionActive.remove(); } }
这里把setActualTransactionActive单独拿出来看一下,以加深对事务执行过程的了解。
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction()); // 相当于表示事务为开启了
并且该类的handleExistingTransaction、prepareTransactionStatus等等方法都会此标记有调用,也就是说它会参与到事务的声明周期里面去
备注:以上方法他们统一的判断条件有:TransactionStatus.isNewTransaction()是新事务的时候才会调用这个方进行标记
另外此类它的suspend暂停的时候会直接的这么调用:
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); TransactionSynchronizationManager.setActualTransactionActive(false);
resume恢复
的时候:
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly); TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
大体上可以得出这样的一个处理步骤:
- 开启新的事务时初始化。第一次开启事务分为:real首次 或 已存在事务但是REQUIRES_NEW
- 在事务的嵌套过程中,TransactionSynchronizationManager属性不断更新最终清除。即外层事务挂起;事务提交,这两个点需要更新TransactionSynchronizationManager属性。
- 这里面有个内部类AbstractPlatformTransactionManager.SuspendedResourcesHolder它是负责事务挂起时候,保存事物属性的对象,用于恢复外层事务。当恢复外层事务时,根据SuspendedResourcesHolder对象,调用底层事务框架恢复事务属性,并恢复TransactionSynchronizationManager
DefaultTransactionStatus
它实现了TransactionStatus接口。
这个是整个事务框架最重要的状态对象,它贯穿于事务拦截器,spring抽象框架和底层具体事务实现框架之间。
它的重要任务是在新建,挂起,提交事务的过程中保存对应事务的属性。在AbstractPlatformTransactionManager中,每个事物流程都会new创建这个对象
TransactionSynchronizationUtils
这个工具类比较简单,主要是处理TransactionSynchronizationManager和执行TransactionSynchronization它对应的方法们,略~
TransactionSynchronization:事务同步器
这个类非常的重要,它是我们程序员对事务同步的扩展点:用于事务同步回调的接口,AbstractPlatformTransactionManager支持它。
注意:自定义的同步器可以通过实现Ordered接口来自己定制化顺序,若没实现接口就按照添加的顺序执行~
// @since 02.06.2003 实现了java.io.Flushable接口 public interface TransactionSynchronization extends Flushable { int STATUS_COMMITTED = 0; int STATUS_ROLLED_BACK = 1; int STATUS_UNKNOWN = 2; // 事务赞提suspend的时候调用此方法 // 实现这个方法的目的一般是释放掉绑定的resources // TransactionSynchronizationManager#unbindResource default void suspend() { } // 事务恢复时候调用 // TransactionSynchronizationManager#bindResource default void resume() { } // 将基础会话刷新到数据存储区(如果适用) 比如Hibernate/Jpa的session @Override default void flush() { } // 在事务提交之前促发。在AbstractPlatformTransactionManager.processCommit方法里 commit之前触发 // 事务提交之前,比如flushing SQL statements to the database // 请注意:若此处发生了异常,会导致回滚~ default void beforeCommit(boolean readOnly) { } // 在beforeCommit之后,在commit/rollback之前执行 // 它和beforeCommit还有个非常大的区别是:即使beforeCommit抛出异常了 这个也会执行 default void beforeCompletion() { } // 这个就非常重要了,它是事务提交(注意事务已经成功提交,数据库已经持久化完成这条数据了)后执行 注意此处是成功提交而没有异常 // javadoc说了:此处一般可以发短信或者email等操作~~因为事务已经成功提交了 // =====但是但是但是:====== // 事务虽然已经提交,但事务资源(链接connection)可能仍然是活动的和可访问的。 // 因此,此时触发的任何数据访问代码仍将“参与”原始事务 允许执行一些清理(不再执行提交操作!) // 除非它明确声明它需要在单独的事务中运行。 default void afterCommit() { } // 和上面的区别在于:即使抛出异常回滚了 它也会执行的。它的notice同上 default void afterCompletion(int status) { } }
我们自定义一个同步器TransactionSynchronization使用得最多的是afterCommit和afterCompletion这两个方法,但是上面的note一定一定要注意,下面我用“人的语言”尝试翻译如下:
- 事务虽然已经提交,但是我的连接可能还是活动的(比如使用了连接池链接是不会关闭的)
- 若你的回调中刚好又使用到了这个链接,它会参与到原始的事务里面去
- 这个时候你参与到了原始事务,但是它并不会给你commit提交。(所以你在这里做的update、insert等默认都将不好使)
- 回收资源(链接)的时候,因为你使用的就是原始事务的资源,所以Spring事务还会给你回收掉,从而就可能导致你的程序出错
声明一下:这段白话文"翻译"是我自主的行为,目前还没有得到任何官方、第三方的描述和认可,我是第一个,旨在希望对小伙伴理解这块有所帮助,若有不对的地方请帮忙留言指正,不甚感激~
依旧为了加强理解,看看源码处是怎么个逻辑:
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable { ... private void processCommit(DefaultTransactionStatus status) throws TransactionException { ... try { prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); ... doCommit(status); // 事务正常提交后 当然triggerAfterCompletion方法上面回滚里有而有个执行 此处不贴出了 try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { cleanupAfterCompletion(status); } } ... // 清楚、回收事务相关的资源~~~ 并且恢复底层事务(若需要~) private void cleanupAfterCompletion(DefaultTransactionStatus status) { status.setCompleted(); if (status.isNewSynchronization()) { TransactionSynchronizationManager.clear(); } if (status.isNewTransaction()) { doCleanupAfterCompletion(status.getTransaction()); } if (status.getSuspendedResources() != null) { if (status.isDebug()) { logger.debug("Resuming suspended transaction after completion of inner transaction"); } Object transaction = (status.hasTransaction() ? status.getTransaction() : null); resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources()); } } }
从这个代码结构里可以看到,即使triggerAfterCommit和triggerAfterCompletion全部都执行了(哪怕是抛错了),最终它一定会做的是:cleanupAfterCompletion(status);这一步会回收资源。
那这种情况怎么避免被它回收呢?其实上面JavaDoc也说了:首先是可能,其次Spring建议使用一个新事务处理来避免这种可能性发生
至于什么是新事务?比如上面的new了一个线程,那都别说新事务了,都开新线程,所以肯定是不存在此问题了的。
Spring这里指的是若你还在同一个线程里,同步进行处理的时候,建议新启一个新事务(使用PROPAGATION_REQUIRES_NEW吧~)