以下代码基于seata和seata-example
一、Seata使用的业务场景
在配车的业务中,我们使用了Seata的分布式事务来保证配车的业务逻辑能够正常时,才会做订单信息推送到财务系统。我们的系统配车业务一开始使用Seata的TCC模式来实现的,需要自己实现try和confirm或者cancel的逻辑。之后,由于seata推出了AT模式,之后系统采用的分布式事务使用的是AT模式。
相比如之前的TCC模式,AT模式只需要添加 @GlobalTransactional就可以实现分布式事务。
rollbackFor=Exception.class) (publicIntegerconfirmCarSubmit(MatchConfirmCarDTOmatchConfirmCarDTO, LoginInfoDtologinInfoDto) { returnretailOrderService.matchConfirmCarSubmit(null, matchConfirmCarDTO, loginInfoDto); }
配车的过程中,如果配了车,则需要修改库存中车辆库存状态为已锁定,配车状态为已配车,且释放原车。同时做财务信息推送。由于涉及到库存和财务信息,因此需要用到分布式事务。
那么为啥加了 @GlobalTransactional,它就可以实现分布式事务呢?
首先Seata分为两端,Seata Server和Seata Client。TC作为Seata的Server端,而RM和TM作为客户端。由于其是注解,因此,我们可以想象得到应该是基于全局事务注解。
下面我们下载seata的源码,基于seata的源码进行学习。
二、Seata服务端启动
首先启动Seata Server:
可以看到Seata Server主要做了这样几件事:
1)初始化监控度量信息
2)初始化改良版雪花算法UUID
3) 初始化SessionHolder
4) 初始化协调器TC,并将其设置为handler
5) 初始化netty服务端
publicstaticvoidmain(String[] args) throwsIOException { //initialize the metricsLOGGER.info("------初始化监控信息-----"); MetricsManager.get().init(); LOGGER.info("------初始化改良版雪花算法UUID-----"); UUIDGenerator.init(parameterParser.getServerNode()); LOGGER.info("------初始化SessionHolder-----"); SessionHolder.init(parameterParser.getStoreMode()); // 事务协调器,相当于netty的handler角色DefaultCoordinatorcoordinator=newDefaultCoordinator(nettyRemotingServer); LOGGER.info("------初始化协调器TC-----"); coordinator.init(); nettyRemotingServer.setHandler(coordinator); LOGGER.info("------初始化netty服务端-----"); nettyRemotingServer.init(); }
其中最重点的是coordinator.init(); 事务协调器初始化。
其主要启动了5个定时任务:
handleRetryRollbacking(); handleRetryCommitting(); handleAsyncCommitting(); timeoutCheck(); undoLogDelete();
其中:
handleRetryRollbacking(); 处理重试回滚,每秒1次
handleRetryCommitting(); 处理重试提交,每秒1次
handleAsyncCommitting(); 处理异步提交,每秒1次
timeoutCheck(); 超时提交,每秒1次
undoLogDelete(); undo log删除,每3小时1次
三、Seata客户端启动
客户端启动的时候,可以看到其会执行GlobalTransactionScanner继承了InitializingBean和AbstractAutoProxyCreator。因此
publicvoidafterPropertiesSet() { ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)this); if (disableGlobalTransaction) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Global transaction is disabled."); } return; } if (initialized.compareAndSet(false, true)) { LOGGER.info("初始化客户端"); initClient(); } }
io.seata.spring.annotation.GlobalTransactionScanner#initClient方法,初始化客户端:
privatevoidinitClient() { //init TMLOGGER.info("-------TM客户端初始化-------"); TMClient.init(applicationId, txServiceGroup, accessKey, secretKey); //init RMLOGGER.info("-------RM客户端初始化-------"); RMClient.init(applicationId, txServiceGroup); // 注册钩子方法registerSpringShutdownHook(); }
主要做了三件事:
初始化TM,注册相关处理器,同时放入processorTable 初始化RM,注册相关处理器,同时放入processorTable 注册钩子方法
接着注册服务到seata中,然后Netty会执行channelRead执行事件处理:
protectedvoidprocessMessage(ChannelHandlerContextctx, RpcMessagerpcMessage) throwsException { Objectbody=rpcMessage.getBody(); if (bodyinstanceofMessageTypeAware) { // 根据messageType获取对应的处理器和线程池,processorTable 在 netty.init 时会初始化MessageTypeAwaremessageTypeAware= (MessageTypeAware) body; finalPair<RemotingProcessor, ExecutorService>pair=this.processorTable.get((int) messageTypeAware.getTypeCode()); // 判断该Pair是否有初始化线程池,如果有就用业务线程池执行,否则直接执行if (pair!=null) { if (pair.getSecond() !=null) { try { //执行处理pair.getSecond().execute(() -> { try { LOGGER.info("---执行处理processMessage-----"); pair.getFirst().process(ctx, rpcMessage); } catch (Throwableth) { LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th); } }); } catch (RejectedExecutionExceptione) { LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(), "thread pool is full, current max pool size is "+messageExecutor.getActiveCount()); if (allowDumpStack) { Stringname=ManagementFactory.getRuntimeMXBean().getName(); Stringpid=name.split("@")[0]; intidx=newRandom().nextInt(100); try { Runtime.getRuntime().exec("jstack "+pid+" >d:/"+idx+".log"); } catch (IOExceptionexx) { LOGGER.error(exx.getMessage()); } allowDumpStack=false; } } } else { try { pair.getFirst().process(ctx, rpcMessage); } catch (Throwableth) { LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th); } } } else { LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode()); } } else { LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body); } }
判断该Pair是否有初始化线程池,如果有就用业务线程池执行,否则直接执行。
pair.getFirst().process(ctx, rpcMessage);
进入RM注册操作,通过Netty的RegRmProcessor。
RM register success,message:RegisterRMRequest{resourceIds='jdbc:mysql://127.0.0.1:3306/seata', applicationId='order-service',transactionServiceGroup='my_test_tx_group'},channel:[id: 0xb806e7c6, L:/127.0.0.1:8091 - R:/127.0.0.1:57663],client version:1.4.2
可以看到rm注册的信息:数据源连接信息、应用id、事务服务组、netty通道信息
进入Tm注册操作,通过Netty的RegTmProcessor。
TMregistersuccess,message:RegisterTMRequest{applicationId='account-service', transactionServiceGroup='my_test_tx_group'},channel:[id: 0xf2eafbcb, L:/127.0.0.1:8091-R:/127.0.0.1:57691],clientversion:1.4.2
可以看到tm注册的信息:应用id、事务服务组、netty通道信息
四、执行业务系统业务
/*** 购买下单,模拟全局事务提交** @return*/"/purchase/commit") (publicBooleanpurchaseCommit(HttpServletRequestrequest) { businessService.purchase("1001", "2001", 1); returntrue; } /*** 减库存,下订单** @param userId* @param commodityCode* @param orderCount*/publicvoidpurchase(StringuserId, StringcommodityCode, intorderCount) { LOGGER.info("purchase begin ... xid: "+RootContext.getXID()); stockClient.deduct(commodityCode, orderCount); orderClient.create(userId, commodityCode, orderCount); }
可以看到减库存,下订单信息:
此时我们可以看到有一个方法:io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke
可以看到拦截器里面有一个invoke方法,此方法会获取全局事务注解和全局锁注解。
finalGlobalTransactionalglobalTransactionalAnnotation=getAnnotation(method, targetClass, GlobalTransactional.class); finalGlobalLockglobalLockAnnotation=getAnnotation(method, targetClass, GlobalLock.class);
根据对应的注解执行对应的处理:
handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation); handleGlobalLock(methodInvocation, globalLockAnnotation)
可以看到控制台打印的日志:
业务系统客户端:
发起的事务:
分支事务:
可以看到分支事务二阶段提交和提交状态committed。
可以看到这个过程经历了:
开启全局事务 创建全局session session开启 channelRead执行业务处理processMessage 注册分支事务 提交全局事务 二阶段分支提交 二阶段提交发送 channelRead执行业务处理processMessage 执行处理processMessage 二阶段提交,删除分支 提交全局事务成功
其中:
在二阶段提交前,会先生成前镜像,然后执行业务sql,然后生成后镜像,准备事务日志。
完成后,会执行二阶段的提交操作。
五、总结
整个过程的操作:
一阶段:首先拦截sql,解析sql语句的语义,提取元数据,找到sql语句,在执行sql前生成前镜像,执行业务sql后,生成后镜像。生成seata事务锁数据,然后构建事务日志并插入事务日志表,注册分支事务。
二阶段:二阶段分支提交,删除保存的事务日志数据,完成数据清理。通过异步线程批量删除在二阶段提交的分支事务日志数据。如果是二阶段回滚操作,则通过事务协调管理器执行二阶段回滚,此时资源管理器会执行回滚一阶段已经执行的业务sql语句,还原数据。