关联博文:
你认真研究过Spring中的@EnableTransactionManagement注解吗?
接上文SpringBoot中事务执行原理分析(二)后,本文我们分析事务执行流程中的获取事务对象 - createTransactionIfNecessary 。
【1】前置分析
也就是分析TransactionAspectSupport的invokeWithinTransaction方法中下面这行代码。
//TransactionAspectSupport TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
这里joinpointIdentification是个String,指的是我们方法的描述信息,本文这里是com.recommend.service.impl.SysAdviceServiceImpl.testSave。ptm则是PlatformTransactionManager事务管理器,txAttr是事务属性对象,记录了事务注解的信息,如下图所示:
接下来我们就开始分析createTransactionIfNecessary方法。如果需要则创建事务,也就是说是否需要创建事务是和事务的传播行为有关的。方法源码如下所示:
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { // If no name specified, apply method identification as transaction name. // 为txAttr设置名称,如果没有设置name则采用方法名字 if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } // 获取TransactionStatus TransactionStatus status = null; if (txAttr != null) { if (tm != null) { status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } //准备事务信息 return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
事务的创建、传播行为的判断都放生在tm.getTransaction(txAttr);
,也就是我们分析的核心方法。至于prepareTransactionInfo,则顾名思义是使用给定的事务属性和事务状态对象准备事务信息,返回一个准备好的TransactionInfo对象。
【2】获取事务状态对象
如下是AbstractPlatformTransactionManager的getTransaction方法,其首先会获取到一个事务对象,然后得到一个DefaultTransactionStatus 对象返回。这个过程中会尝试进行事务传播行为的判断、事务属性的设定等操作。
// AbstractPlatformTransactionManager#getTransaction @Override public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { // Use defaults if no transaction definition given. // 获取事务定义,其实就是我们前面得到的TransactionAttribute TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults()); // 获取到一个事务对象 Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); //如果事务存在,则检测传播行为并返回。如果当前没有事务存在,则继续处理下面逻辑 if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(def, transaction, debugEnabled); } // Check definition settings for new transaction. //检测事务属性中的超时属性,默认是-1 if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout()); } // No existing transaction found -> check propagation behavior to find out how to proceed. //如果事务传播行为是PROPAGATION_MANDATORY,则抛出异常 -- PS:使用当前的事务,如果当前没有事务,就抛出异常。 if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { //suspend是挂起事务,这里返回的SuspendedResourcesHolder 是null SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); } try { //getTransactionSynchronization 得到的是0 ,总是激活事务同步 。SYNCHRONIZATION_NEVER是2 从不激活事务同步 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); //获取到事务状态对象 得到一个DefaultTransactionStatus 实例对象 DefaultTransactionStatus status = newTransactionStatus( def, transaction, true, newSynchronization, debugEnabled, suspendedResources); //开始事务 doBegin(transaction, def); //准备同步 prepareSynchronization(status, def); //返回DefaultTransactionStatus return status; } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } //事务传播行为为 PROPAGATION_SUPPORTS PROPAGATION_NOT_SUPPORTED PROPAGATION_NEVER else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + def); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); // 这个与第二部分区别是 transaction为null return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null); } }
我们简单梳理一下方法的逻辑,如下所示:
获取事务定义,其实就是我们前面得到的TransactionAttribute
核心方法,获取到一个事务对象
如果事务存在,则检测传播行为并返回。如果当前没有事务存在,则继续处理下面逻辑。
检测事务属性中的超时属性;
将事务的传播行为分成三个部分来进行处理:
处理 PROPAGATION_MANDATORY也就是必须已经存在事务,否则抛出异常
处理PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW 、PROPAGATION_NESTED也就是必须有事务;
处理PROPAGATION_SUPPORTS 、PROPAGATION_NOT_SUPPORTED以及PROPAGATION_NEVER也就是可不存在事务
① 获取事务对象
DataSourceTransactionManager的获取事务方法如下所示。
doGetTransaction // org.springframework.jdbc.datasource.DataSourceTransactionManager#doGetTransaction @Override protected Object doGetTransaction() { // 实例化一个DataSourceTransactionObject DataSourceTransactionObject txObject = new DataSourceTransactionObject(); // 设置是否允许savepoint 默认为true txObject.setSavepointAllowed(isNestedTransactionAllowed()); //获取到ConnectionHolder ,顾名思义其持有了一个JDBC Connection;本文这里当前为null ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource()); //为DataSourceTransactionObject设置ConnectionHolder txObject.setConnectionHolder(conHolder, false); return txObject; }
保存点(savepoint)是事务过程中的一个逻辑点,用于取消部分事务,当结束事务时,会自动的删除该事务中所定义的所有保存点。当执行rollback时,通过指定保存点可以回退到指定的点。
DataSourceTransactionObject 持有一个ConnectionHolder,被DataSourceTransactionManager当做事务对象。其继承于JdbcTransactionObjectSupport,实现了SavepointManager和SmartTransactionObject接口。
TransactionSynchronizationManager是事务同步管理器。在TransactionSynchronizationManager类中,使用ThreadLocal来为不同的事务线程提供独立的资源副本,并且同时维护这些事务的配置属性和运行状态。
public abstract class TransactionSynchronizationManager { //线程上下文中保存着【DataSource:ConnectionHolder】的Map对象。线程可以通过该属性获取到同一个Connection对象。 private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources"); //事务同步器,是Spring交由程序员进行扩展的代码,每个线程可以注册N个事务同步器。 private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<>("Transaction synchronizations"); // 当前事务的名称 private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<>("Current transaction name"); // 当前事务是否是只读 private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<>("Current transaction read-only status"); // 当前事务的隔离级别 private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<>("Current transaction isolation level"); // 事务是否开启 private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<>("Actual transaction active"); }
判断事务是否存在的代码如下所示,如果存在ConnectionHolder且transactionActive为true,表示当前已经存在事务。本文这里测试案例不存在一个已经存在的事务,则跳过handleExistingTransaction方法分析,我们另起篇章研究。
@Override protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive()); }
接下来我们分析事务传播行为中的第二部分逻辑。
② PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW以及PROPAGATION_NESTED
当事务传播行为是上面这三个时,也就是说必须有事务。我们分析其是如何开启并准备事务的。
newTransactionStatus得到一个DefaultTransactionStatus 实例对象。
// org.springframework.transaction.support.AbstractPlatformTransactionManager#newTransactionStatus protected DefaultTransactionStatus newTransactionStatus( TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) { //newSynchronization 为true ,actualNewSynchronization 为true boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive(); //实例化对象 isReadOnly为false return new DefaultTransactionStatus( transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources); }
也就是说DefaultTransactionStatus持有了事务对象以及状态信息比如是否同步、是否是新事务、是否只读、挂起资源对象。
③ doBegin(transaction, def)
DataSourceTransactionManager的doBegin方法如下所示,在前面获取到事务对象后。这里会尝试获取连接对象并设置一些属性。
// DataSourceTransactionManager#doBegin protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { // 如果事务对象中没有ConnectionHolder,或者其synchronizedWithTransaction为true if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { // 获取一个新的连接对象--本文这里是从HikariPool接一个连接出来 Connection newCon = obtainDataSource().getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } //为事务对象设置ConnectionHolder,true表示newConnectionHolder为true txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } //设置ConnectionHolder的属性synchronizedWithTransaction为true txObject.getConnectionHolder().setSynchronizedWithTransaction(true); //获取到连接持有器持有的连接对象 con = txObject.getConnectionHolder().getConnection(); //尝试为连接设置只读属性并获取到事务的隔离级别,这里获取到的previousIsolationLevel 为null Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); //为事务对象设置隔离级别和只读属性 txObject.setPreviousIsolationLevel(previousIsolationLevel); // 默认是false txObject.setReadOnly(definition.isReadOnly()); // Switch to manual commit if necessary. This is very expensive in some JDBC drivers, // so we don't want to do it unnecessarily (for example if we've explicitly // configured the connection pool to set it already). // 如果连接是自动提交的,那么重置为false,转换为手动提交 if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } con.setAutoCommit(false); } //准备事务连接,如果是只读事务则执行 SET TRANSACTION READ ONLY prepareTransactionalConnection(con, definition); //设置事务激活状态为true txObject.getConnectionHolder().setTransactionActive(true); //获取事务的超时属性,默认为 -1 int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // Bind the connection holder to the thread. //如果事务对象持有的ConnectionHolder 是新创建的 (newConnectionHolder=true),则绑定资源到当前线程 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { // 释放连接 - 归还连接给数据源 DataSourceUtils.releaseConnection(con, obtainDataSource()); //为事务对象恢复状态,设置ConnectionHolder为null,newConnectionHolder 为false txObject.setConnectionHolder(null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }
我们简单梳理方法逻辑如下:
如果事务对象中没有ConnectionHolder,或者其synchronizedWithTransaction为true,则获取连接对象并实例化ConnectionHolder为事务对象txObject设置属性值;
设置ConnectionHolder的属性synchronizedWithTransaction为true
获取到链接持有器持有的链接对象
尝试为连接设置只读属性并获取到事务的隔离级别,这里获取到的previousIsolationLevel 为null
为事务对象设置隔离级别和只读属性(默认为false)
如果连接是自动提交的,那么重置为false,转换为手动提交
准备事务连接,如果是只读事务则执行 SET TRANSACTION READ ONLY
设置事务激活状态为true
获取并设置事务的超时属性,默认为 -1
如果事务对象持有的ConnectionHolder 是新创建的 (newConnectionHolder=true),则绑定资源到当前线程。这里会更新ThreadLocal<Map<Object, Object>> resources对象,map中的key是数据源,value是ConnectionHolder。
如果抛出了异常,则释放连接、恢复事务对象状态
④ prepareSynchronization(status, def)
如下所示,根据需要初始化事务同步。其实也就是根据DefaultTransactionStatus 和TransactionDefinition 更新TransactionSynchronizationManager持有的几个线程上下文本地对象。
//AbstractPlatformTransactionManager#prepareSynchronization protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) { // newSynchronization 为true if (status.isNewSynchronization()) { // 更新ThreadLocal<Boolean> actualTransactionActive 本文这里设置为true //actualTransactionActive 存储了事务激活状态 TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction()); //更新ThreadLocal<Integer> currentTransactionIsolationLevel,本文这里是null //currentTransactionIsolationLevel存储了当前线程的事务隔离级别 TransactionSynchronizationManager.setCurrentTransactionIsolationLevel( definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel() : null); //更新ThreadLocal<Boolean> currentTransactionReadOnly //存储了当前线程的Current transaction read-only status本文这里是false TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly()); //更新ThreadLocal<String> currentTransactionName //存储了当前线程的Current transaction name ,本文这里是方法全名 TransactionSynchronizationManager.setCurrentTransactionName(definition.getName()); //初始化同步 Transaction synchronizations TransactionSynchronizationManager.initSynchronization(); } }
TransactionSynchronizationManager的initSynchronization方法如下所示,首先判断是否已经同步激活,如果没有则更新ThreadLocal<Set<TransactionSynchronization>> synchronizations,设置值为new LinkedHashSet<>()。
public static void initSynchronization() throws IllegalStateException { if (isSynchronizationActive()) { throw new IllegalStateException("Cannot activate transaction synchronization - already active"); } logger.trace("Initializing transaction synchronization"); synchronizations.set(new LinkedHashSet<>()); }
上面过程我们分析的是事务传播行为为 PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW以及PROPAGATION_NESTED的逻辑。下面我们分析一下事务传播行为为PROPAGATION_SUPPORTS 、PROPAGATION_NOT_SUPPORTED以及PROPAGATION_NEVER的逻辑。
⑤ PROPAGATION_SUPPORTS
、PROPAGATION_NOT_SUPPORTED
以及PROPAGATION_NEVER
如下所示,首先尝试获取事务的同步属性transactionSynchronization
// getTransactionSynchronization 默认值为 0 即SYNCHRONIZATION_ALWAYS boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); //这里第一个null,指的是transaction,最后一个null是suspendedResources return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
prepareTransactionStatus方法如下所示,是一个模板方法。由newTransactionStatus和prepareSynchronization组成。而这两个方法我们上面已经分析过,不同的是,这里的transaction为null。
protected final DefaultTransactionStatus prepareTransactionStatus( TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) { DefaultTransactionStatus status = newTransactionStatus( definition, transaction, newTransaction, newSynchronization, debug, suspendedResources); prepareSynchronization(status, definition); return status; }
【3】准备事务信息
也就是下图中的prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);。前面我们已经得到了事务状态对象TransactionStatus并做了一些属性设置,这里我们将尝试得到一个事务信心对象TransactionInfo。
TransactionAspectSupport的prepareTransactionInfo方法如下。
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, String joinpointIdentification, @Nullable TransactionStatus status) { //实例化TransactionInfo,持有事务管理器tm,事务属性对象txAttr以及事务名称等 TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification); //如果事务属性对象不为null,则为其设置事务状态对象transactionStatus为传入的TransactionStatus if (txAttr != null) { // We need a transaction for this method... if (logger.isTraceEnabled()) { logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]"); } // The transaction manager will flag an error if an incompatible tx already exists. txInfo.newTransactionStatus(status); } else { // The TransactionInfo.hasTransaction() method will return false. We created it only // to preserve the integrity of the ThreadLocal stack maintained in this class. if (logger.isTraceEnabled()) { logger.trace("No need to create transaction for [" + joinpointIdentification + "]: This method is not transactional."); } } // We always bind the TransactionInfo to the thread, even if we didn't create // a new transaction here. This guarantees that the TransactionInfo stack // will be managed correctly even if no transaction was created by this aspect. txInfo.bindToThread(); return txInfo; }
TransactionAspectSupport.TransactionInfo的bindToThread方法如下所示,其会获取到TransactionAspectSupport的成员ThreadLocal<TransactionInfo> transactionInfoHolder中储存的事务信息对象赋予oldTransactionInfo 。然后为transactionInfoHolder设置新值为当前的事务信息对象。
//org.springframework.transaction.interceptor.TransactionAspectSupport.TransactionInfo#bindToThread private void bindToThread() { // Expose current TransactionStatus, preserving any existing TransactionStatus // for restoration after this transaction is complete. this.oldTransactionInfo = transactionInfoHolder.get(); transactionInfoHolder.set(this); } this.oldTransactionInfo指的是TransactionAspectSupport.TransactionInfo.oldTransactionInfo