著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
本文代码为spring 5.1.2
spring是如何控制事务的提交和回滚
加上@Transactional
注解之后,Spring可以启到事务控制的功能,在正式执行方法前它会做一系列的操作,我们来看看在这之中它到底都做了些什么?
首先进入CglibAopProxy.class
的intercept方法打上一个Debug断点调试,或者在JdkDynamicAopProxy.class
的invoke方法(如果目标方法是继承接口方式实现),根据不同实现方法类型选择不同的动态代理类。不了解的同学可以查看我的另一篇文章《通过Spring Aop 了解动态代理》
CglibAopProxy.class
Class<?> targetClass=target!=null?target.getClass():null; targetClass: "class com.examp ble.threadtransaction.service.UserService" List<Object>chain=this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClas ss);chain:size=1advised:"org.springframework.aop.frame Object retVal; if(chain.isEmpty()&& Modifier.isPublic(method.getModifiers())){ Object[]argsToUse=AopProxyUtils.adaptArgumentsIfNecessary(method,args); retVal = methodProxy.invoke(target,argsToUse); retVal=(newCglibAopProxy.CglibMethodInvocation(proxy, target, method, ,args,targetClass chain, methodproxy .proceed() proxy:"com.example.threadtro retVal=CglibAopProxy.processReturnType(proxy, target, method, retVal); var16 = retVal; } finally{ if(target != null && !targetSource.isStatic()){
JdkDynamicAopProxy.class
} target = targetSource.getTarget(); Class<?> targetClass = target != null ? target.getClass(): null; List<0bject>chain=this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass): if(chain.isEmpty()){ Object[]argsToUse=AopProxyUtils.adaptArgumentsIfNecessary(method,args); retVal=AopUtils.invokeJoinpointUsingReflection(target, method,argsToUse);} else { MethodInvocationinvocation=newReflectiveMethodInvocation(proxy, target, method, args, targetClass,chain); retVal invocation.proceed(); } Class<?> returnType = method.getReturnType();
此处动态代理具体是指代的目标对象和目标方法,通过下图debug Evaluate
看出
Expression: (newCglibAopProxy.CglibMethodInvocation(proxy,target,method, args, targetClass, chain, methodProxy)) Result: beresult={CalibAopProxy$CalibMethodInvocation@8563}"ReflectiveMethodInvocation:public void com.example.threadtransaction.service.UserSer methodProxy={MethodProxy@6547) publicMethod = true fproxv=UserService$$EnhancerBvSprinaCGLIB$$34ea1d76@65111"com.examole.threadtransaction.service,UserService@7a5829b8 target ={UserService@6578) method=(Method@6545}"public void com.example.threadtransaction.service.UserService.insert30) throws java.sql.SQLException' arguments ={Object[0]@8565} targetClass={Class@5043}"casscom.example.threadtransaction.service.UserService".… Navigate userAttributes = null interceptorsAndDynamicMethodMatchers={ArrayList@6592}size=1 currentinterceptorlndex=-1
进去之后来到ReflectiveMethodInvocation.class
然后一直往下走来到invoke方法
public final Object[l getArguments() { return this.arguments; } public void setArguments(0bject... arguments){ this.arguments = arguments; l x @Nullable public Object proceed() throws Throwable f if(this.currentInterceptorIndex==this.interceptorsAndDynamicMethodMatchers.size()-1){ return this.invokeJoinpoint();1 else ( Object interceptorOrInterceptionAdvice=this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);interceptor0rInterceptionAdvice: Tr if(interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher){ InterceptorAndDynamicMethodMatcherdm=(InterceptorAndDynamicMethodMatcher)interceptorOrInterceptionAdvice: Class<?>targetClass= this.targetClass!= null?this.targetClass :this.method.getDeclaringClass();argetClass: "class com.example.threadtransaction return dm.methodMatcher.matches(this.method, targetClass,this.arguments)?dm.interceptor.invoke( methodinvocation: this): this.proceed(); method: “pul } else { return((MethodInterceptor)interceptorOrInterceptionAdvice) .invoke(( methodlnvocation: this) interceptor0rInterceptionAdvice:TransactionInterceptor@8643
进入invoke方法来到 TransactionInterceptor.class
this . setTransactionAttributes ( attributes ); public TransactionInterceptor (P1atformTransactionManager ptm , TransactionAttributeSource tas )( this . setlransactionManager ( ptm ); this . setTransactionAttributeSource ( tas ); @ Nullable public Object invoke ( MethodInvocation invocation ) throws Throwable { Class <?> targetClass invocation . getThis () I = null ? AopUtils . getTargetClass ( invocation .getThis0):nun1; Method Var10001= invocation . gete tn00()) invocation getlass (): return this . invokeWithinTransak tion (var10001, targetClass , invocation :: proceed ); private void writeObject ( ObjectOutputStream oos ) throws IOException { 005,0e1aUtWiteobiect():
继续往下,进入invokeWithinTransaction()
方法,来到 TransactionAspectSupport.class
的invokeWithinTransaction()
方法,
@param targetClass the target class that we're invoking the method on @param invocation the callback to use for proceeding with the target invocation invocation: TransactionInterceptor$lamb@return the return value of the method, if any @throws Throwable propagated from the target invocation*/ @Nullable protected 0bject invokeWithinTransaction(Method method: @Nullable Class<?> targetClass, method: "public void com.example.t finalInvocationCallbackinvocation)throwsThrowable{invocation:TransactionInterceptor$Lambda@8705 事务基本信息 // If the transaction attribute is null. the method is non-transactional. TransactionAttributeSourcetas=getTransactionAttributeSource();tas:AnnotationTransactionAttributeSource@865 finalTransactionAttributetxAttr=(tas!=null?tas.getTransactionAttribute(method, targetClass):null); tx4 finalPlatformTransactionManagertm=determineTransactionManager(txAttr);tm:DataSourceTransactionManager@8674 final StringjoinpointIdentification=methodIdentification(method,targetClass, txAttr);joinpointIdentificatio "col if(txAttr== null|!(tm instanceof CallbackPreferringPlatformTransactionManager)){/Standard transaction demarcation with getTransaction and commit/rollback calls如果有需要,则创建事务 TransactionInfo txInfo createTransactionIfNecessary(tm, txAttr,joinpointIdentification); txInfo:"PROPAGATION_R hiact na+e7 tSupportSTransactioninfo8734 "PROPAGATION REQUIRED,ISOLATION DEFAULT" 事务信息隔窝级别 //This is an around advice: Invoke the next interceptor in the chain.// This will normally result in a target object being invoked. retVal=invocation.proceedWithInvocation();调用业务方法 catch(Throwable ex){ // target invocation exception completeTransactionAfterThrowing(txInfo,ex); 捕获异常rollback throw ex;} finally { cleanupTransactionInfo(txInfo); commitTransactionAfterReturning(txInfo); commit return retVal;} else { final ThrowableHolder throwableHolder = new ThrowableHolder();
进入createTransactionIfNecessary(tm, txAttr, joinpointIdentifcation)
方法,如图
* ereturn a Iransactionlnfo object , whether or not d * The [@ code hasTransaction ()} method on TransactionInfo can be used to * tell if there was a transaction created . @ see # getTransactionAttributeSource ()/ serial / protected TransactionInfo CreateTransactionIfNecessary (@ Nullable PlatformTransactionManager tm , @ Nullable TransactionAttribute txAttr , final String joinpointIdentification ) t ta 。 JdkDynamicAopProxy . class x transaction was created . // If no name specified , apply method identification as transaction name . if ( txAttr != null && txAttr . getName ()== null ){ txAttr = new DelegatingTransactionAttribute ( txAttr ){ @0verride public String getName (){ return joinpointIdentification ; TransactionStatus status - null ; if ( txAttr != null ) if ( tm != null ) status - tm . getTransaction ( txAttr ); 創建事務的 else { if ( logger . isDebugEnabled ()){ logger . debug ( o :“ Skipping transactional joinpoint [”+ joinpointIdentification - "] because no transaction manager has been configured "); return prepareTransactionInfo ( tm , txAttr , joinpointIdentification , status );
出现异常时进入completeTransactionAfterThrowing()
方法,可以看到其中调用了rollback()
回滚
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) { if(txInfo != null && txInfo.getTransactionStatus() != null){ if(logger.isTraceEnabled()){ logger.trace(o:"Completing transaction for ["+txInfo.getJoinpointIdentification()+ "] after exception: "+ ex); } if(txInfo.transactionAttribute!= null &&txInfo.transactionAttribute.rollbackOn(ex)){ try txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); catch(TransactionSystemException ex2){ logger.error( o:"Application exception overridden by rollback exception", ex); ex2.initApplicationException(ex); throw ex2;} catch(RuntimeException| Error ex2){ logger.error(0:"Application exception overridden by rollback exception", ex); throw ex2; else { // We don't roll back on this exception. // Will still roll back if TransactionStatus.isRollbackOnly() is true. try { txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());} catch(TransactionSystemException ex2){ logger.error(o:"Application exception overridden by commit exception", ex); ex2.initApplicationException(ex); throw ex2;} catch(RuntimeException|Error ex2){ logger.error(0:"Application exception overridden by commit exception", ex); throw ex2; } } } }
没有异常则继续往下执行commitTransactionAfterReturning()
/** *Execute after successful completion ofcall, but not after an exception was handled.* Do nothing if we didn't create a transaction. @param txInfo information about the current transaction protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo){ if(txInfo!= null && txInfo.getTransactionStatus()!= null){ if(logger.isTraceEnabled()){ logger.trace( o:"Completing transaction for「"+txInfo.getJoinpointIdentification() + "l"); txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } }
以上只是解读了大致的处理流程,由于没有画流程图,可能有的读者光看文章会觉得脑回路不清晰,建议大家在阅读文本之后也去自己的项目当中浏览一下,理清逻辑顺序,如果有小伙伴想提供本文流程图,欢迎你!
@Transactional 事务注解是如何产生作用的?
如上所说,事务最开始是先进入CglibAopProxy.class
的intercept()
方法或者JdkDynamicAopProxy.class
的invoke()
方法,可以看出这2个类都是AOP增强类,所以@Transactional
事务注解才可以产生作用。
来到TransactionAspectSupport.class
的 createTransactionIfNecessary()
方法
@see :#getTransactionAttributeSourceC /serial/ protected TransactionInfocreateTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @NullableTransactionAttribute txAttr, final String joinpointIdentification){ // If no name specified, apply method identification as transaction name. if(txAttr!= null &&txAttr.getName()== null){TransactionAspectSupport.createTransactionIfNece txAttr = new DelegatingTransactionAttribute(txAttr){ @Override public String getName() { return joinpointIdentification;} }; } TransactionStatus status = null; if(txAttr != null) { if(tm != nul1){ status =tm.getTransaction(txAttr); 获取事务 else { >> if(logger.isDebugEnabled()){ logger.debug(o:"Skipping transactional joinpoint ["+ joinpointIdentification + "] because no transaction manager has been configured"); return prepareTransactionInfo(tm,txAttr,joinpointIdentification,status);
/serial/ protected TransactionInfocreateTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification){ //If no name specified, apply method identification as transaction name. if(txAttr!=null&&txAttr.getName()== null){TransactionAspectSupport.createTransactionIfNecess txAttr = new DelegatingTransactionAttribute(txAttr){ @Override public String getName() { return joinpointIdentification; }}; 有默认值,其实这个值的来源就是业务方法的@Transactional注解 TransactionStatus status = null; if(txAttr != null){ if(tm !=null){ status =tm.getTransaction(txAttr); 1 +(RuleBasedTransactionAttribute@8710 "PROPAGATION REQUIRED,ISOLATION DEFAULT" if(logger.isDebugEnabled()){ logger.debug(o:"Skipping transactional joinpoint ["+ joinpointIdentification + "] because no transaction manager has been configured"); return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
进入tm.getTransaction(txAttr)
; 来到AbstractPlatformTransactionManager.class
338 @see #doBegin 339 340 */ @Override 获取事务环境 341 0 publicfinal TransactionStatus getTransaction(@NullableTransactionDefinition definition) throws TransactionException { 342 Object transaction = doGetTransaction(); 获取当前事务的环境 343 344 // Cache debug flag to avoid repeated checks. 345 boolean debugEnabled = logger.isDebugEnabled(); 346 347 if(definition == null){ 348 // Use defaults if no transaction definition given. definition = new DefaultTransactionDefinition(); 349 350 } 351 352 if(isExistingTransaction(transaction)){ 如果存在事务信息 353 // Existing transaction found->check propagation behavior to find out how to behave 354 returnhandleExistingTransaction(definition, transaction, debugEnabled): 355 } 356 357 // Check definition settings for new transaction. 358 if(definition.getTimeout()<TransactionDefinition.TIMEOUT DEFAULT){ 359 throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); 36日 } 361 362 // No existing transaction found -> check propagation behavior to 如果事务信息属于这个则抛出异常 363 if(definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_MANDATORY){ 364 throw new IllegalTransactionStateException( 365 "No existing transaction found for transaction marked with propagation 'mandatory'"); 366 367 else (definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION REOUIRED II 368 definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION REOUIRES NEW I 369 definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION NESTED) { 370 SuspendedResourcesHolder suspendedResources =suspend( transaction: null) 371 if(debugEnabled){ 这三种情况创建事务 372 logger.debug(o:"Creating new transaction with name ["+ definition.getName() +"]:"+ definition); 373 } 374 try { 375 boolean newSynchronization=(getTransactionSynchronization()!= SYNCHRONIZATION_NEVER); 376 DefaultTransactionStatus status = newTransactionStatus( 377 definition,transaction, newTransaction:true,newSynchronization, debugEnabled,suspendedResources); 378 doBegin(transaction,definition); 379 prepareSynchronization(status,definition); 380 return status; 381 382 } catch(RuntimeException|Error ex){ 383 resume( transaction: null, suspendedResources); 384 throw ex; 385 2 386 387 else{其他情况不使用事务 388 // Create "empty"transaction: no actual transaction, but potentially synchronization. 389 if(definition.getIsolationLevel()!=TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()){ 390 logger.warn(o:"Custom isolation level specified but no actual transaction initiated; "+ 391 "isolation level will effectively be ignored: "+ definition): 392 393 booleannewSynchronization=(getTransactionSynchronization()== SYNCHRONIZATION ALWAYS); 394 return prepareTransactionStatus(definition,transaction:null, newTransaction: true,newSynchronizatiomidmugEaantsus 395396
@Transactional
注解内容如下, 可以看到 Propagation
属性的默认值为 REQUIRED
@Target({ElementType.METHOD,ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@Inherited@Documented public @interface Transactional { * Alias for {@link #transactionManager}.* @see #transactionManager @AliasFor("transactionManager") String value()default""; /** *A<em>qualifier</em> value for the specified transaction.*<p>May be used to determine the target transaction manager,*matching the qualifier value (or the bean name) of a specific *{@Link org.springframework.transaction.PlatformTransactionManager]* bean definition.* @since 4.2* @sec #value *| @AliasFor("value") String transactionManager() default""; /** * The transaction propagation type. * <p>Defaults to {@link Propagation#REQUIRED}. *@seeorg.springframework.transaction.interceptor.TransactionAttribute#getPropagationBe Propagation propagation() default Propagation.REQUIRED; /* * The transaction isolation level. * <p>Defaults to f@link Isolation#DEFAULT}. *<p>Exclusively desiqned for use with {@link Propaqation#REQUIRED} or *{@link Propagation#REQUIRES_NEW} since it only applies to newly stas*transactions.Consider switching the "validateExistingTransactionsflag to
如果当前已经存在事务,进入handleExistingTransaction方法,如图
199 * Create a TransactionStatus for an exilsting transaction 46G 市/ 401 @ private TransactionStatus handleExistingTransaction( 481 TrensactionDefinition definition, Object transaction, boolean dobugEnabled) throws TramsactionException { 464 495 if(definition.getPnopsgationBehavior()-=TrangactionDefinition.PROPAGATION_NEVER){ 486 throw new lllegalTransactionstateException( 抛出异常 497 "Existing transaction found for transaction marked with propagation "never""); 468 ) 489 416 if(definition.getPropegationBehavior()=TransactionDefinition.PROPAGATION NOT SUPPORTED)( 411 if(debugEnsbled){ 挂起事务 412 logger.debug( o: "Suspending current treasaction"); 413 414 Object suspendedResounces = suspend(trensection): 415 boolean new&ymchronization=(getTrensactionSynchronization()== SMNCHROWIZATION ALYS); 416 retumn prepareTransactionstatus( 417 definition,transsctianmullnewTransaction: false,newSynchronization, debugEnabled, suspendedReso 418419 428 if(definition-getPropagationBehevior()= TransactionDefinition.PROPAGATIOW REQUIRES NEM) { if(debugEnsbled){ 421 422 logger.debug( "Suspending current transaction, creating neu transaction uith name ["+ 423 definitiom.getMame()+“]"); 434 425 SuspendedtesouncesHolder suspendedtesounces = suspend(transaction);挂起当前环境事务 426 try ( 427 booleannew&ynchronization=(getTramssctionSynchronization()I= SMNCHROWIZATIOW NEWER) 428 DefaultTrensectionStatus status = newTrensectionStatus( 429 definition,transaction, newTramsaction:true,newSynchronization, debugEnabled,suspendedResou 438 doBegin(trensection,definition); 431 prepareSymchronization(status, definition); 432 returm status; 433 434 catch (RuntimeException | Error beginEx)( 435 mesumeAfterbeginException(tramsaction, suspendetesounces, beginEx); 436 437 } throw beginEx; 创建新事务 438439 if(definitiongetPropagationBehavior()=TransactionDefinition,PRAPAG4TION_NESTED) { 441 if(lisNestedTransactionAllowed()){ 创建谈套事务 442 throw new NestedTransactionlotSupportedException( 443 "Transaction manager does not allow nested transactions by default _"+ naa "specify 'nestedTransactionAllowed’property with value 'true'"); 445 } 446 if (debugEnabled}{ 判断环境是否允许使用savePoint(保存点,回浪点 447 lagger.debug(o:"Creating nested transaction with name ["+ definition.gethame() + "]"); 448 if(useSavepointForlestedTransactionO)){ 创建带保存点的事务 458 /Create savepoint within existing Spring"managed transaction, 451 // through the SavepointManager API implemented by TransactionStatus. 452 //UsuallyusesDBC30 savepoints, Hever activates Spring synchronization: 453 DofaultTransactionStatus status = 454 prepareTransactionStatus(definitiontransaction,newTransaction: false,newSynchranizationc false 455 status.createAndHoldSavepointO): 456 return status; 457 458 else { 不能使用保存点则创健新事务,和之的事务一样的 459 // Mested transaction through nested begin and coemit/rollback calls. 46B Usually onlyfor JTa:Spring synchronization might get activated here '/ in case of a pre existing ITA transaction. 4G1 402 booleannewSynchronization(getTransactionSynchronization()I=SYNCHRGNIZATIONNEVER) 463 DefaultTransactionStatus status = newTransactionStatus( 464 definitiontransaction,newTransacbantrue,mewSynchromization, debugEnbled,suspendedResourc 465 doBegin(transaction,definition): 466 prepareSynchronization(status, definition); 467 return status! 468469478 471 // Assumably PROPAEATION SUPPORTS OF PROPAEATION REQUIRED. 472 if(debugEnabled){ logger.debug( o:"Participating in existing transaction"); 4了9 if (1sValidateExistingTransaction()) { 其他情况则使用当前事务,沿用当前事务环境 if(definition:getIsolationLevel()1=TransactionDefinition:ISOLATION_DEFAULT){ Integer currentIsolationLevel=TransactionSynchronizationManager.getCurrentTransactionIsoLatianLevel() if(currentIsolationLevel -null1 currentIsolationLevel I=definition:getIsolat Constants isoConstants=DofaultTransactionDefinition.canstants: thrownewIllegalTransactionStateException("Participating transaction with definition ["
简单时序图
有哪些不同类型的事务
Spring中的事务传播属性
- 挂起:在新方法里面,不使用外部的事务环境
- 嵌套事务:一个事务里面包含另一个事务
- 保存点:一个事务新起时的保存记录点
事务类型不同会有什么不同效果
以 REQUIRES_NEW
和 NESTED
为例,来讲讲嵌套事务和保存点的概念。
如下是一段伪代码
情景一:
@Transactional // 默认事务级别 buyShopping(){ insertOrder(); stockService.updateStock(); }
@Transactional(propagation = Propagation.REQUIRES_NEW) // 如果使用该级别,库存扣减回滚,插入订单也回滚(使用的保存点) @Transactional(propagation = Propagation.NESTED) updateStock(){ // 省略业务代码 // 商品库存减 1 throw // 如果使用该级别,库存扣减回滚,插入订单成功 Exception; }
情景二:
@Transactional // 默认事务级别 buyShopping(){ insertOrder(); stockService.updateStock(); throw Exception; }
// 如果使用该级别,库存扣减成功,插入订单失败回滚(新事务) @Transactional(propagation = Propagation.REQUIRES_NEW) // 如果使用该级别,插入订单失败回滚,库存扣减同样回滚(savePoint机制) @Transactional(propagation = Propagation.NESTED) updateStock(){ // 省略业务代码 // 商品库存减 1 }
关于嵌套事务的更多概念可以参考==>
Spring 采用保存点(Savepoint)实现嵌套事务原理
注意事项
- 不要随意使用事务的传播属性
- 不影响事务的业务代码,请使用try/catch处理
- 影响事务的业务代码,可以抛出异常使其回滚
Spring是如何控制事务的提交与回滚
事务要保证ACID的完整性必须依靠事务日志做跟踪,每一个操作在真正写入数据数据库之前,先写入到日志文件中,如要删除一行数据会先在日志文件中将此行标记为删除,但是数据库中的数据文件并没有发生变化。只有在(包含多个sql语句)整个事务提交后,再把整个事务中的sql语句批量同步到磁盘上的数据库文件
在事务引擎上的每一次写操作都需要执行两遍:
- 先写入日志文件中,写入日志文件中的仅仅是操作过程,而不是操作数据本身,所以速度比写数据库文件速度要快很多.
- 然后再写入数据库文件中,写入数据库文件的操作是重做事务日志中已提交的事务操作的记录.
日志组
一般不止设置一个日志文件,一个文件写满之后使用另外一个日志文件提高服务器效率。 日志文件的日志同步到磁盘后空间会自动释放,单个日志文件不宜设置过大,如果日志文件过大mysql进程在把日志同步到数据文件的时候可能会崩溃
事务日志用途
事务日志可以帮助提高事务的效率,使用事务日志,存储引擎在修改表的数据的时候只需要修改其内存拷贝,再把该行为记录到持久在磁盘的事务日志中。而不用每次都将修改的数据本身持久到磁盘事务日志采用的是追加方式,因此写日志的操作是磁盘上一小块区域的顺序IO,而不像随机IO需要磁盘在多个地方移动。所以采用事务日志的方式相对来说要快的多,事务日志持久后,内存中的修改在后台慢慢的刷回磁盘。期间如果系统发生崩溃,存储引擎在重启的时候依靠事务日志自动恢复这部分被修改数据。
更多内容可以参考==> 详细分析MySQL事务日志(redo log和undo log)