Seata提供XA模式实现分布式事务_没有引入分布式事物问题演示
初始数据库数据
正常情况
发送请求http://localhost:6001/transfer?amount=2
制造异常
在bank2微服务制造异常
异常后测试
发送请求http://localhost:6001/transfer?amount=2
Seata提供XA模式实现分布式事务_项目引入Seata
创建 UNDO_LOG 表
SEATA XA 模式需要 UNDO_LOG 表
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
添加依赖
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency>
修改配置文件YML
seata: # 注册中心 registry: type: file service: # seata服务端的地址和端口信息,多个使用英文分号分隔 grouplist: default: 192.168.66.100:9999 tx-service-group: my_test_tx_group
bank1微服务开启全局事物
@Transactional
@GlobalTransactional //开启全局事务
bank2开启事物
测试分布式事物
发送请求http://localhost:6001/transfer?amount=2
总结
传统2PC(基于数据库XA协议)和Seata实现2PC的两种2PC方案, 由于Seata的零入侵并且解决了传统2PC长期锁资源的问题,所以推荐采用Seata实现2PC。
XA强一致性分布式事务实战_Atomikos介绍
简单介绍
产品分两个版本:
1、TransactionEssentials:开源的免费产品;
2、ExtremeTransactions:上商业版,需要收费。
这两个产品的关系如下图所示:
什么是JTA
Java事务API(JTA:Java Transaction API)和它的同胞Java事务服 务(JTS:Java Transaction Service),为J2EE平台提供了分布式事务服务(distributed transaction)的能力。
注意:
要想使用用 JTA 事务,那么就需要有一个实现 javax.sql.XADataSource 、 javax.sql.XAConnection 和 javax.sql.XAResource 接口的 JDBC 驱动程序。一个实现了这些 接口的驱动程序将可以参与 JTA 事务。一个 XADataSource 对象就是一个 XAConnection 对象的工厂。XAConnection 是参与 JTA 事务的JDBC 连接。
XA强一致性分布式事务实战_业务说明
场景介绍
本案例使用Atomikos框架实现XA强一致性分布式事务,模拟跨库转账的业务场景。不同账户之间的转账操作通过同一个项目程序完成。
说明:
转账服务不会直接连接数据库进行转账操作,而是通过 Atomikos框架对数据库连接进行封装,通过Atomikos框架操作 不同的数据库。由于Atomikos框架内部实现了XA分布式事务协 议,因此转账服务的逻辑处理不用关心分布式事务是如何实现的,只需要关注具体的业务逻辑。
框架选型
user_account数据表结构
设计完数据表后,在192.168.66.100服务器创建2个数据库,分别 为tx-xa-01和tx-xa-02,分别在2个数据库中创建转出金额数据库。
DROP TABLE IF EXISTS `user_account`; CREATE TABLE `user_account` ( `account_no` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '账户编号' , `account_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账户 名称' , `account_balance` decimal(10, 2) NULL DEFAULT NULL COMMENT '账户余额' , PRIMARY KEY (`account_no`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
添加数据
tx-xa-01库中添加数据。
INSERT INTO `user_account` VALUES ('1001','张三', 10000.00);
tx-xa-02库中添加数据。
INSERT INTO `user_account` VALUES ('1002','李四', 10000.00);
XA强一致性分布式事务实战_项目搭建
创建atomikos-xa项目
创建依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency> <!-- druid连接池依赖组件--> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.22</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
编写配置文件
server: port: 6003 spring: autoconfigure: #停用druid连接池的自动配置 exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure datasource: #选用druid的XADataSource数据源,因为这个数据源支持分布式事务管理 type: com.alibaba.druid.pool.xa.DruidXADataSource #以下是自定义字段 dynamic: primary: master datasource: master: url: jdbc:mysql://192.168.66.102:3306/tx-xa-01? useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&autoReconnect=true username: root password01: 123456 driver-class-name: com.mysql.jdbc.Driver slave: url: jdbc:mysql://192.168.66.102:3306/tx-xa-02?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&autoReconnect=true username: root password01: 123456 driver-class-name: com.mysql.jdbc.Driver validation-query: SELCET 1
编写主启动类
@Slf4j @SpringBootApplication @EnableTransactionManagement(proxyTargetClass = true) public class TxXaStarter { public static void main(String[] args){ SpringApplication.run(TxXaStarter.class,args); log.info("*************** TxXaStarter*********"); } }
XA强一致性分布式事务实战_多数据源实现
创建第一个数据源的配置类DBConfig1
@Data @ConfigurationProperties(prefix = "spring.datasource.dynamic.datasource.master") public class DBConfig1 { private String url; private String username; private String password; private String dataSourceClassName; }
创建第二个数据源的配置类DBConfig2
@Data @ConfigurationProperties(prefix = "spring.datasource.dynamic.datasource.slave") public class DBConfig2 { private String url; private String username; private String password; private String dataSourceClassName; }
创建持久层接口
分别在com.itbaizhan.mapper1包和com.itbaizhan.mapper2包下创建UserAccount1Mapper接口和UserAccount2Mapper接口。
public interface UserAccount1Mapper extends BaseMapper<UserAccount> {} public interface UserAccount2Mapper extends BaseMapper<UserAccount> {}
创建MyBatisConfig1类
MyBatisConfig1类的作用是整合Atomikos框架,读取DBConfig1类 中的信息,实现数据库连接池,最终通过Atomikos框架的数据库连接池连接数据库并操作。
@Configuration @MapperScan(basePackages = "com.tong.mapper1", sqlSessionTemplateRef = "masterSqlSessionTemplate") public class MyBatisConfig1 { @Bean(name = "masterDataSource") public DataSource masterDataSource(DBConfig1 dbConfig1) { AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean(); sourceBean.setUniqueResourceName("masterDataSource"); sourceBean.setXaDataSourceClassName(dbConfig1.getDataSourceClassName()); sourceBean.setTestQuery("select 1"); sourceBean.setBorrowConnectionTimeout(3); MysqlXADataSource dataSource = new MysqlXADataSource(); dataSource.setUser(dbConfig1.getUsername()); dataSource.setPassword(dbConfig1.getPassword()); dataSource.setUrl(dbConfig1.getUrl()); sourceBean.setXaDataSource(dataSource); return sourceBean; } @Bean(name = "masterSqlSessionFactory") public SqlSessionFactory masterSqlSessionFactory(@Qualifier("masterDataSource") DataSource dataSource) throws Exception { MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean(); sqlSessionFactoryBean.setDataSource(dataSource); return sqlSessionFactoryBean.getObject(); } @Bean(name = "masterSqlSessionTemplate") public SqlSessionTemplate masterSqlSessionTemplate(@Qualifier("masterSqlSessionFactory") SqlSessionFactory sqlSessionFactory){ return new SqlSessionTemplate(sqlSessionFactory); } }
创建MyBatisConfig2类
MyBatisConfig2类的作用与MyBatisConfig1类的作用相似,只不过 MyBatisConfig2类读取的是DBConfig2类中的信息,封装的是整合 了Atomikos框架的另一个数据源的数据库连接池,通过连接池连接数据库并操作。
@Configuration @MapperScan(basePackages = "com.tong.mapper2", sqlSessionTemplateRef = "slaveSqlSessionTemplate") public class MyBatisConfig2 { @Bean(name = "slaveDataSource") public DataSource slaveDataSource(DBConfig2 dbConfig2) { AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean(); sourceBean.setUniqueResourceName("slaveDataSource"); sourceBean.setXaDataSourceClassName(dbConfig2.getDataSourceClassName()); sourceBean.setTestQuery("select 1"); sourceBean.setBorrowConnectionTimeout(3); MysqlXADataSource dataSource = new MysqlXADataSource(); dataSource.setUser(dbConfig2.getUsername()); dataSource.setPassword(dbConfig2.getPassword()); dataSource.setUrl(dbConfig2.getUrl()); sourceBean.setXaDataSource(dataSource); return sourceBean; } @Bean(name = "slaveSqlSessionFactory") public SqlSessionFactory slaveSqlSessionFactory(@Qualifier("slaveDataSource") DataSource dataSource) throws Exception { MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean(); sqlSessionFactoryBean.setDataSource(dataSource); return sqlSessionFactoryBean.getObject(); } @Bean(name = "slaveSqlSessionTemplate") public SqlSessionTemplate slaveSqlSessionTemplate(@Qualifier("slaveSqlSessionFactory") SqlSessionFactory sqlSessionFactory){ return new SqlSessionTemplate(sqlSessionFactory); } }