今天继续给大家分享一下阿里的分布式事务中间件seata的使用,跟上篇文章《springboot多数据源整合分布式事务中间件seata》不一样的是,上篇文章是单服务绑定多数据源的分布式情况,而本文基于微服务下每个服务绑定一个数据源的场景,服务之间依靠eureka客户端feign进行通信。
注:seata有三种模式,AT模式、TCC模式和saga模式,上篇文章和本篇文章介绍的都是AT模式。感兴趣的同学可以参考官网进行了解这三种模式:
http://seata.io/en-us/docs/dev/mode/at-mode.html
环境搭建
还是先说一下本文使用的实验环境:
springboot:2.1.6.RELEASE
orm框架:mybatis
数据库:mysql
数据库连接池:HikariCP
seata server:1.3.0
springcloud:Greenwich.SR2
整个项目的架构如下:
可以看到,项目中有order-server,account-server,storage-server这3个服务,每个服务绑定自己的数据库。这3个服务都注册到eureka上面,同时也都注册TM到seata-server。seata-server也注册到了eureka上。
首先我们启动eureka,这里监听8889端口,yml文件配置如下:
server: port: 8889 spring: application: name: eureka-server #Eureka实例名,集群中根据这里相互识别 eureka: instance: hostname: localhost #客户端 client: #是否开启注册服务,因为这里如果为true表示自己注册自己,而自己就是一个服务注册方,没必要自己注册自己 register-with-eureka: false #是否拉取服务列表,这里我只提供服务给别的服务。 fetch-registry: false #注册中心地址 service-url: defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/ #服务端: server: enable-self-preservation: false
接着我们启动seata server,由于seata server也要注册到eureka上面,我们需要修改seata server,把registry.conf文件修改为如下:
registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "eureka" #省略无关的代码 eureka { #我本地配置的eureka地址 serviceUrl = "http://192.168.59.1:8889/eureka" application = "seata-server" weight = "1" } #省略其他代码
执行如下命令启动seata server:
./seata-server.sh -p 8091 -h 127.0.0.1 -m file
配置3个微服务,这里我们以order-server为例,先看yml文件,核心配置就是eureka、feign、mybatis和数据源,代码如下:
#配置eureka eureka: instance: hostname: localhost prefer-ip-address: true client: serviceUrl: defaultZone: http://${eureka.instance.hostname}:8889/eureka/ feign: hystrix: enabled: false client: config: default: connectTimeout: 5000 readTimeout: 10000 logging: level: io: seata: info #配置mybatis mybatis: mapperLocations: classpath:mapper/*.xml typeAliasesPackage: io.seata.sample.entity server: port: 8180 spring: application: name: order-server cloud: alibaba: seata: tx-service-group: my_test_tx_group #配置数据源 datasource: driver-class-name: com.mysql.jdbc.Driver password: 123456 jdbcUrl: jdbc:mysql://192.168.59.1:3306/seata_order?useAffectedRows=true&serverTimezone=UTC&characterEncoding=utf-8 username: root
registry.conf文件,最核心的就是type使用eureka,然后配置eureka地址和应用名称,部分代码如下:
registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "eureka" #省略其他代码 eureka { serviceUrl = "http://localhost:8889/eureka" application = "order-server" weight = "1" } #省略其他代码 file.conf文件只要改一下service就可以了,里面配置seata server地址,代码如下: service { #transaction service group mapping vgroupMapping.my_test_tx_group = "seata-server" #only support when registry.type=file, please don't set multiple addresses default.grouplist = "192.168.59.132:8091" #degrade, current not support enableDegrade = false #disable seata disableGlobalTransaction = false }
用这种方式配置account-server和storage-server,之后把3个服务都起来,启动成功后,访问eureka页面,地址如下:
http://localhost:8889/
这时我们能看到加上seata server一共4个服务都已经启动成功了,见下图:
下面我贴一下整个项目的sql语句:
#########################seata_order库 use database seata_order; CREATE TABLE `orders` ( `id` mediumint(11) NOT NULL AUTO_INCREMENT, `user_id` int(11) DEFAULT NULL, `product_id` int(11) DEFAULT NULL, `COUNT` int(11) DEFAULT NULL COMMENT '数量', `pay_amount` decimal(10,2) DEFAULT NULL, `status` varchar(100) DEFAULT NULL, `add_time` datetime DEFAULT CURRENT_TIMESTAMP, `last_update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 #########################seata_pay库 use database seata_pay; DROP TABLE account; CREATE TABLE `account` ( `id` BIGINT(11) NOT NULL AUTO_INCREMENT COMMENT 'id', `user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id', `total` DECIMAL(10,0) DEFAULT NULL COMMENT '总额度', `used` DECIMAL(10,0) DEFAULT NULL COMMENT '已用余额', `balance` DECIMAL(10,0) DEFAULT '0' COMMENT '剩余可用额度', `last_update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8; INSERT INTO `seata_pay`.`account` (`id`, `user_id`, `total`, `used`, `balance`) VALUES ('1', '1', '1000', '0', '100'); #########################seata_storage库 use database seata_storage; CREATE TABLE `storage` ( `id` BIGINT(11) NOT NULL AUTO_INCREMENT, `product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id', `total` INT(11) DEFAULT NULL COMMENT '总库存', `used` INT(11) DEFAULT NULL COMMENT '已用库存', `residue` INT(11) DEFAULT NULL COMMENT '剩余库存', PRIMARY KEY (`id`) ) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8; INSERT INTO `seata_storage`.`storage` (`id`, `product_id`, `total`, `used`, `residue`) VALUES ('1', '1', '100', '0', '100'); #########################下面的undo_log表前面三个库都需要创建 CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8
至此,整个环境就搭建完成了,接着我们进行测试。我们先看下account和storage两张表的数据,其他表都是空,如下图:
account
storage表
测试
现在我们开始进行测试,有2个场景,正常commit和场景和异常rollback场景。我们向order-server发一个post请求,content如下:
{ "userId":1, "productId":1, "count":1, "money":1, "payAmount":50 }
这时order-server执行成功,日志如下:
2020-08-15 16:31:53.704 INFO 57868 --- [nio-8181-exec-1] i.s.sample.service.AccountServiceImpl : ------->扣减账户开始account中 2020-08-15 16:31:54.141 INFO 57868 --- [nio-8181-exec-1] i.s.sample.service.AccountServiceImpl : ------->扣减账户结束account中 2020-08-15 16:31:54.141 INFO 57868 --- [nio-8181-exec-1] i.s.sample.service.AccountServiceImpl : 修改订单状态开始 2020-08-15 16:31:54.309 INFO 57868 --- [nio-8181-exec-1] c.netflix.config.ChainedDynamicProperty : Flipping property: order-server.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647 2020-08-15 16:31:54.344 INFO 57868 --- [nio-8181-exec-1] c.n.u.concurrent.ShutdownEnabledTimer : Shutdown hook installed for: NFLoadBalancer-PingTimer-order-server 2020-08-15 16:31:54.345 INFO 57868 --- [nio-8181-exec-1] c.netflix.loadbalancer.BaseLoadBalancer : Client: order-server instantiated a LoadBalancer: DynamicServerListLoadBalancer:{NFLoadBalancer:name=order-server,current list of Servers=[],Load balancer stats=Zone stats: {},Server stats: []}ServerList:null 2020-08-15 16:31:54.352 INFO 57868 --- [nio-8181-exec-1] c.n.l.DynamicServerListLoadBalancer : Using serverListUpdater PollingServerListUpdater 2020-08-15 16:31:54.385 INFO 57868 --- [nio-8181-exec-1] c.netflix.config.ChainedDynamicProperty : Flipping property: order-server.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647 2020-08-15 16:31:54.386 INFO 57868 --- [nio-8181-exec-1] c.n.l.DynamicServerListLoadBalancer : DynamicServerListLoadBalancer for client order-server initialized: DynamicServerListLoadBalancer:{NFLoadBalancer:name=order-server,current list of Servers=[10.192.86.60:8180],Load balancer stats=Zone stats: {defaultzone=[Zone:defaultzone; Instance count:1; Active connections count: 0; Circuit breaker tripped count: 0; Active connections per server: 0.0;] },Server stats: [[Server:10.192.86.60:8180; Zone:defaultZone; Total Requests:0; Successive connection failure:0; Total blackout seconds:0; Last connection made:Thu Jan 01 08:00:00 CST 1970; First connection made: Thu Jan 01 08:00:00 CST 1970; Active Connections:0; total failure count in last (1000) msecs:0; average resp time:0.0; 90 percentile resp time:0.0; 95 percentile resp time:0.0; min resp time:0.0; max resp time:0.0; stddev resp time:0.0] ]}ServerList:org.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerList@5fa120e1 2020-08-15 16:31:54.562 INFO 57868 --- [nio-8181-exec-1] i.s.sample.service.AccountServiceImpl : 修改订单状态结束:订单状态修改成功 2020-08-15 16:31:54.578 WARN 57868 --- [nio-8181-exec-1] c.a.c.seata.web.SeataHandlerInterceptor : xid in change during RPC from 192.168.59.132:8091:37575557075451904 to null 2020-08-15 16:31:55.166 INFO 57868 --- [ch_RMROLE_1_1_8] i.s.c.r.p.c.RmBranchCommitProcessor : rm client handle branch commit process:xid=192.168.59.132:8091:37575557075451904,branchId=37575566823014400,branchType=AT,resourceId=jdbc:mysql://192.168.59.1:3306/seata_pay,applicationData=null 2020-08-15 16:31:55.169 INFO 57868 --- [ch_RMROLE_1_1_8] io.seata.rm.AbstractRMHandler : Branch committing: 192.168.59.132:8091:37575557075451904 37575566823014400 jdbc:mysql://192.168.59.1:3306/seata_pay null 2020-08-15 16:31:55.170 INFO 57868 --- [ch_RMROLE_1_1_8] io.seata.rm.AbstractRMHandler : Branch commit result: PhaseTwo_Committed 2020-08-15 16:31:55.356 INFO 57868 --- [erListUpdater-0] c.netflix.config.ChainedDynamicProperty : Flipping property: order-server.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
这时我们再看下数据库的数据,如下图:
order表
account表
storage表
容易迷惑我们的是,order表的pay_amount值是45.5,而不是50,这时因为account-server执行完成后,又调用了order-server,执行了更新操作,代码如下:
public void decrease(Long userId, BigDecimal payAmount) { LOGGER.info("------->扣减账户开始account中"); //模拟超时异常,全局事务回滚 //try { // Thread.sleep(30*1000); //} catch (InterruptedException e) { // e.printStackTrace(); //} accountDao.decrease(userId,payAmount); LOGGER.info("------->扣减账户结束account中"); //修改订单状态,此调用会导致调用成环 LOGGER.info("修改订单状态开始"); String mes = orderApi.update(userId, payAmount.multiply(new BigDecimal("0.09")),0); LOGGER.info("修改订单状态结束:{}",mes); }
第二个场景的测试,我们把上面的代码模拟超时异常这段放开,用debug模式进行,继续发送前面一样的请求,可以看到seata_order库中的undo_log,如下图:
undo_log
等超时后,事务进行了回滚。
总结
可以看到,本篇文章介绍的场景跟上篇差不多,本质都是基于undo_log的交易补偿,这也是AT模式的特点。但是本文的环境比上节复杂很多,这并不是seata本身造成的,而是微服务的拆分带来的系统架构的复杂性。