三,Seata 安装
1,下载seata
官网下载地址: https://seata.io/zh-cn/blog/download.html
2,安装seata
解压seata-server-1.5.2安装包。解压出来我们进入bin目录会看到相应的启动脚本(由于这里是在Windows演示)
在 Linux/Mac 下
sh ./bin/seata-server.sh
在 Windows 下
bin\seata-server.bat
高可用部署:
Seata 的高可用依赖于注册中心、配置中心和数据库来实现。使用nacos和redis为例
这里我们看到是基于文件的方式,后面我们会有详细的课程讲解基于nacos注册中心的安装配置等
3,运行seata
- windows:bin目录下双击seata-server.bat启动
- linux:命令行启动 seata-server.sh -h 127.0.0.1 -p 8091
启动参数:
- -h: 注册到注册中心的ip
- -p: Server rpc 监听端口
- -m: 全局事务会话信息存储模式,nacos, consul, apollo, zk, etcd3,优先读取启动参数
- -n: Server node,多个Server时,需区分各自节点,用于生成不同区间的transactionId,以免冲突
- -e: 环境配置
四,项目环境搭建
1,增加 Maven 依赖
首先,您需要将 nacos-client 的 Maven 依赖添加到您的项目 pom.xml 文件中
<dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>最新版</version> </dependency> <dependency> <groupId>com.alibaba.nacos</groupId> <artifactId>nacos-client</artifactId> <version>1.2.0及以上版本</version> </dependency>
2,Client端配置中心
在 application.yml 中加入对应的配置中心,其余配置参考
seata: config: type: nacos nacos: server-addr: 127.0.0.1:8848 group : "SEATA_GROUP" namespace: "" username: "XXXXXX"
3,Server端配置中心
在 registry.conf 中加入对应配置中心,其余配置参考
config { type = "nacos" nacos { serverAddr = "127.0.0.1:8848" group = "SEATA_GROUP" namespace = "" username = "nacos" } }
4,上传配置至Nacos配置中心
seata: config: type: nacos nacos: server-addr: 127.0.0.1:8848 group : "SEATA_GROUP" namespace: "" dataId: "seataServer.properties" username: "nacos"
五,源码分析
今天咱们开始咱们的主题了,在了解以上问题之后我们是不是比较好奇他的运行流程。了解执行流程我们就要从官网,或者源码入手来了解
1,启动类
// 【排除SpringBoot启动时自动加载GlobalTransactionAutoConfiguration.class】 @SpringBootApplication(exclude = {GlobalTransactionAutoConfiguration.class}) @EnableFeignClients // 远程调用 @EnableDiscoveryClient // 开启nacos服务 public class GatewayApplication{ public static void main(String[] args) { ErosApplication.run(ProjectNameConstatnt.GATEWAY_NAME,"consume",GatewayApplication.class,args); } }
2,配置类
@GlobalTransactional(name="XXXXX",rollbackFor = RuntimeException.class)
3,@GlobalTransactional
方法上加了@GlobalTransactional,Seata通过aop检测到之后,就会使用TM和TC通信,注册全局事务。
在@GlobalTransactional涵括的代码中,不管是本服务中的sql操作,还是feign调用别的服务的sql操作,只要sql操作满足如下:insert操作,delete操作,update操作,select for update操作。 就会被seata增强,使用RM与TC通信,注册分支事务。
全局事务:包括开启事务、提交、回滚、获取当前状态等方法。
public interface GlobalTransaction { /** * 开启一个全局事务(使用默认的事务名和超时时间) */ void begin() throws TransactionException; /** * 开启一个全局事务,并指定超时时间(使用默认的事务名) */ void begin(int timeout) throws TransactionException; /** * 开启一个全局事务,并指定事务名和超时时间 */ void begin(int timeout, String name) throws TransactionException; /** * 全局提交 */ void commit() throws TransactionException; /** * 全局回滚 */ void rollback() throws TransactionException; /** * 获取事务的当前状态 */ GlobalStatus getStatus() throws TransactionException; /** * 获取事务的 XID */ String getXid(); }
GlobalTransactionScanner继承自AbstractAutoProxyCreator,在这里拦截到加了@GlobalTransactional的方法。
GlobalTransactionScanner
- AbstractAutoProxyCreator:wrapIfNecessary(aop的核心),getAdvicesAndAdvisorsForBean(拦截器)
- InitializingBean:afterPropertiesSet(初始化TM,RM)
3.1 初始化TM,RM
@Override public void afterPropertiesSet() { //是否禁止了全局事务 if (disableGlobalTransaction) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Global transaction is disabled."); } return; } //初始化netty 客户端(TM RM) initClient(); }
初始化TM,RM
private void initClient() { //init TM TMClient.init(applicationId, txServiceGroup); //init RM RMClient.init(applicationId, txServiceGroup); }
3.2 TransactionalTemplate
GlobalTransaction 和 GlobalTransactionContext API 把一个业务服务的调用包装成带有分布式事务支持的服务。
public class TransactionalTemplate { public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException { // 1. 获取当前全局事务实例或创建新的实例 GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); // 2. 开启全局事务 try { tx.begin(business.timeout(), business.name()); } catch (TransactionException txe) { // 2.1 开启失败 throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure); } Object rs = null; try { // 3. 调用业务服务 rs = business.execute(); } catch (Throwable ex) { // 业务调用本身的异常 try { // 全局回滚 tx.rollback(); // 3.1 全局回滚成功:抛出原始业务异常 throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex); } catch (TransactionException txe) { // 3.2 全局回滚失败: throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, ex); } } // 4. 全局提交 try { tx.commit(); } catch (TransactionException txe) { // 4.1 全局提交失败: throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure); } return rs; } }
DataSourceProxy#getConnection
@Override public ConnectionProxy getConnection() throws SQLException { Connection targetConnection = targetDataSource.getConnection(); return new ConnectionProxy(this, targetConnection); }
ConnectionProxy#doCommit
private void doCommit() throws SQLException { //处理@GlobalTransaction的分支事务 if (context.inGlobalTransaction()) { processGlobalTransactionCommit(); } //处理@GlobalLock,即检查一下是否可以获取全局锁 else if (context.isGlobalLockRequire()) { processLocalCommitWithGlobalLocks(); } else { targetConnection.commit(); } }
ConnectionProxy#processGlobalTransactionCommit
模板的异常方法
class ExecutionException extends Exception { // 发生异常的事务实例 private GlobalTransaction transaction; // 异常编码: // BeginFailure(开启事务失败) // CommitFailure(全局提交失败) // RollbackFailure(全局回滚失败) // RollbackDone(全局回滚成功) private Code code; // 触发回滚的业务原始异常 private Throwable originalException;