Seata学习整理

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: seata一阶段:首先拦截sql,解析sql语句的语义,提取元数据,找到sql语句,在执行sql前生成前镜像,执行业务sql后,生成后镜像。生成seata事务锁数据,然后构建事务日志并插入事务日志表,注册分支事务。二阶段:二阶段分支提交,删除保存的事务日志数据,完成数据清理。通过异步线程批量删除在二阶段提交的分支事务日志数据。如果是二阶段回滚操作,则通过事务协调管理器执行二阶段回滚,此时资源管理器会执行回滚一阶段已经执行的业务sql语句,还原数据。

以下代码基于seata和seata-example


一、Seata使用的业务场景

在配车的业务中,我们使用了Seata的分布式事务来保证配车的业务逻辑能够正常时,才会做订单信息推送到财务系统。我们的系统配车业务一开始使用Seata的TCC模式来实现的,需要自己实现try和confirm或者cancel的逻辑。之后,由于seata推出了AT模式,之后系统采用的分布式事务使用的是AT模式。

相比如之前的TCC模式,AT模式只需要添加 @GlobalTransactional就可以实现分布式事务。

@GlobalTransactional(rollbackFor=Exception.class)
@OverridepublicIntegerconfirmCarSubmit(MatchConfirmCarDTOmatchConfirmCarDTO, LoginInfoDtologinInfoDto) {
returnretailOrderService.matchConfirmCarSubmit(null, matchConfirmCarDTO, loginInfoDto);
    }

配车的过程中,如果配了车,则需要修改库存中车辆库存状态为已锁定,配车状态为已配车,且释放原车。同时做财务信息推送。由于涉及到库存和财务信息,因此需要用到分布式事务。

那么为啥加了 @GlobalTransactional,它就可以实现分布式事务呢?

首先Seata分为两端,Seata Server和Seata Client。TC作为Seata的Server端,而RM和TM作为客户端。由于其是注解,因此,我们可以想象得到应该是基于全局事务注解。

下面我们下载seata的源码,基于seata的源码进行学习。


二、Seata服务端启动

首先启动Seata Server:

可以看到Seata Server主要做了这样几件事:

1)初始化监控度量信息

2)初始化改良版雪花算法UUID

3) 初始化SessionHolder

4) 初始化协调器TC,并将其设置为handler

5) 初始化netty服务端

publicstaticvoidmain(String[] args) throwsIOException {
//initialize the metricsLOGGER.info("------初始化监控信息-----");
MetricsManager.get().init();
LOGGER.info("------初始化改良版雪花算法UUID-----");
UUIDGenerator.init(parameterParser.getServerNode());
LOGGER.info("------初始化SessionHolder-----");
SessionHolder.init(parameterParser.getStoreMode());
// 事务协调器,相当于netty的handler角色DefaultCoordinatorcoordinator=newDefaultCoordinator(nettyRemotingServer);
LOGGER.info("------初始化协调器TC-----");
coordinator.init();
nettyRemotingServer.setHandler(coordinator);
LOGGER.info("------初始化netty服务端-----");
nettyRemotingServer.init();
    }

其中最重点的是coordinator.init(); 事务协调器初始化。

其主要启动了5个定时任务:

handleRetryRollbacking();
handleRetryCommitting();
handleAsyncCommitting();
timeoutCheck();
undoLogDelete();

其中:

handleRetryRollbacking(); 处理重试回滚,每秒1次

handleRetryCommitting(); 处理重试提交,每秒1次

handleAsyncCommitting(); 处理异步提交,每秒1次

timeoutCheck(); 超时提交,每秒1次

undoLogDelete(); undo log删除,每3小时1次


三、Seata客户端启动

客户端启动的时候,可以看到其会执行GlobalTransactionScanner继承了InitializingBean和AbstractAutoProxyCreator。因此

@OverridepublicvoidafterPropertiesSet() {
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
            (ConfigurationChangeListener)this);
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
            }
return;
        }
if (initialized.compareAndSet(false, true)) {
LOGGER.info("初始化客户端");
initClient();
        }
    }

io.seata.spring.annotation.GlobalTransactionScanner#initClient方法,初始化客户端:

privatevoidinitClient() {
//init TMLOGGER.info("-------TM客户端初始化-------");
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
//init RMLOGGER.info("-------RM客户端初始化-------");
RMClient.init(applicationId, txServiceGroup);
// 注册钩子方法registerSpringShutdownHook();
    }

主要做了三件事:

初始化TM,注册相关处理器,同时放入processorTable
初始化RM,注册相关处理器,同时放入processorTable
注册钩子方法

接着注册服务到seata中,然后Netty会执行channelRead执行事件处理:

protectedvoidprocessMessage(ChannelHandlerContextctx, RpcMessagerpcMessage) throwsException {
Objectbody=rpcMessage.getBody();
if (bodyinstanceofMessageTypeAware) {
// 根据messageType获取对应的处理器和线程池,processorTable 在 netty.init 时会初始化MessageTypeAwaremessageTypeAware= (MessageTypeAware) body;
finalPair<RemotingProcessor, ExecutorService>pair=this.processorTable.get((int) messageTypeAware.getTypeCode());
// 判断该Pair是否有初始化线程池,如果有就用业务线程池执行,否则直接执行if (pair!=null) {
if (pair.getSecond() !=null) {
try {
//执行处理pair.getSecond().execute(() -> {
try {
LOGGER.info("---执行处理processMessage-----");
pair.getFirst().process(ctx, rpcMessage);
                            } catch (Throwableth) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                            }
                        });
                    } catch (RejectedExecutionExceptione) {
LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
"thread pool is full, current max pool size is "+messageExecutor.getActiveCount());
if (allowDumpStack) {
Stringname=ManagementFactory.getRuntimeMXBean().getName();
Stringpid=name.split("@")[0];
intidx=newRandom().nextInt(100);
try {
Runtime.getRuntime().exec("jstack "+pid+" >d:/"+idx+".log");
                            } catch (IOExceptionexx) {
LOGGER.error(exx.getMessage());
                            }
allowDumpStack=false;
                        }
                    }
                } else {
try {
pair.getFirst().process(ctx, rpcMessage);
                    } catch (Throwableth) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                    }
                }
            } else {
LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
            }
        } else {
LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
        }
    }

判断该Pair是否有初始化线程池,如果有就用业务线程池执行,否则直接执行。

pair.getFirst().process(ctx, rpcMessage);

进入RM注册操作,通过Netty的RegRmProcessor。

RM register success,message:RegisterRMRequest{resourceIds='jdbc:mysql://127.0.0.1:3306/seata', applicationId='order-service',transactionServiceGroup='my_test_tx_group'},channel:[id: 0xb806e7c6, L:/127.0.0.1:8091 - R:/127.0.0.1:57663],client version:1.4.2

可以看到rm注册的信息:数据源连接信息、应用id、事务服务组、netty通道信息

进入Tm注册操作,通过Netty的RegTmProcessor。

TMregistersuccess,message:RegisterTMRequest{applicationId='account-service', transactionServiceGroup='my_test_tx_group'},channel:[id: 0xf2eafbcb, L:/127.0.0.1:8091-R:/127.0.0.1:57691],clientversion:1.4.2

可以看到tm注册的信息:应用id、事务服务组、netty通道信息


四、执行业务系统业务

/*** 购买下单,模拟全局事务提交** @return*/@RequestMapping("/purchase/commit")
publicBooleanpurchaseCommit(HttpServletRequestrequest) {
businessService.purchase("1001", "2001", 1);
returntrue;
    }
/*** 减库存,下订单** @param userId* @param commodityCode* @param orderCount*/@GlobalTransactionalpublicvoidpurchase(StringuserId, StringcommodityCode, intorderCount) {
LOGGER.info("purchase begin ... xid: "+RootContext.getXID());
stockClient.deduct(commodityCode, orderCount);
orderClient.create(userId, commodityCode, orderCount);
    }

可以看到减库存,下订单信息:

此时我们可以看到有一个方法:io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke

可以看到拦截器里面有一个invoke方法,此方法会获取全局事务注解和全局锁注解。

finalGlobalTransactionalglobalTransactionalAnnotation=getAnnotation(method, targetClass, GlobalTransactional.class);
finalGlobalLockglobalLockAnnotation=getAnnotation(method, targetClass, GlobalLock.class);

根据对应的注解执行对应的处理:

handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
handleGlobalLock(methodInvocation, globalLockAnnotation)

可以看到控制台打印的日志:

业务系统客户端:

发起的事务:

image-20221205015013666.png


分支事务:

image-20221205015154491.png

可以看到分支事务二阶段提交和提交状态committed。

可以看到这个过程经历了:

开启全局事务
创建全局session
session开启
channelRead执行业务处理processMessage
注册分支事务
提交全局事务
二阶段分支提交
二阶段提交发送
channelRead执行业务处理processMessage
执行处理processMessage
二阶段提交,删除分支
提交全局事务成功

其中:

在二阶段提交前,会先生成前镜像,然后执行业务sql,然后生成后镜像,准备事务日志。

完成后,会执行二阶段的提交操作。

五、总结

整个过程的操作:

一阶段:首先拦截sql,解析sql语句的语义,提取元数据,找到sql语句,在执行sql前生成前镜像,执行业务sql后,生成后镜像。生成seata事务锁数据,然后构建事务日志并插入事务日志表,注册分支事务。

二阶段:二阶段分支提交,删除保存的事务日志数据,完成数据清理。通过异步线程批量删除在二阶段提交的分支事务日志数据。如果是二阶段回滚操作,则通过事务协调管理器执行二阶段回滚,此时资源管理器会执行回滚一阶段已经执行的业务sql语句,还原数据。            

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
6月前
|
SQL 关系型数据库 数据库
学习分布式事务Seata看这一篇就够了,建议收藏
学习分布式事务Seata看这一篇就够了,建议收藏
|
6月前
|
Java 数据库连接 API
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
129 0
|
6月前
|
存储 Oracle 关系型数据库
分布式事物【Seata实现、下载启动Seata服务、搭建聚合父工程构建】(四)-全面详解(学习总结---从入门到深化)
分布式事物【Seata实现、下载启动Seata服务、搭建聚合父工程构建】(四)-全面详解(学习总结---从入门到深化)
90 0
|
6月前
|
开发框架 Java 数据库连接
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)(下)
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
97 0
|
6月前
|
Java 数据库连接 mybatis
分布式事物【Seata实现、下载启动Seata服务、搭建聚合父工程构建】(四)-全面详解(学习总结---从入门到深化)(下)
分布式事物【Seata实现、下载启动Seata服务、搭建聚合父工程构建】(四)-全面详解(学习总结---从入门到深化)
73 0
|
存储 缓存 uml
Seata的TCC模式学习整理
Seata的TCC模式学习整理
83 0
|
缓存 Java Spring
Seata中的TCC模式学习一
Seata中的TCC模式学习一
139 0
|
存储 SpringCloudAlibaba Java
SpringCloud Alibaba学习(十二):Seata处理分布式事务(三万字提供 介绍、搭建、实战、原理一条龙服务)(上)
SpringCloud Alibaba学习(十二):Seata处理分布式事务(三万字提供 介绍、搭建、实战、原理一条龙服务)
181 0
SpringCloud Alibaba学习(十二):Seata处理分布式事务(三万字提供 介绍、搭建、实战、原理一条龙服务)(上)
|
SQL SpringCloudAlibaba Java
SpringCloud Alibaba学习(十二):Seata处理分布式事务(三万字提供 介绍、搭建、实战、原理一条龙服务)(下)
SpringCloud Alibaba学习(十二):Seata处理分布式事务(三万字提供 介绍、搭建、实战、原理一条龙服务)
164 0
SpringCloud Alibaba学习(十二):Seata处理分布式事务(三万字提供 介绍、搭建、实战、原理一条龙服务)(下)
|
Nacos
【学习Seata1.6源码#03】TC 集群具有高可用架构的秘密
【学习Seata1.6源码#03】TC 集群具有高可用架构的秘密
236 0
【学习Seata1.6源码#03】TC 集群具有高可用架构的秘密
下一篇
无影云桌面