安装Seata
使用docker安装seata前提,client端(业务端)需要新增表undo_log,具体根据数据库类型来
MySQL为:
-- for AT mode you must to init this sql for you business database. the seata server not need it. CREATE TABLE IF NOT EXISTS `undo_log` ( `branch_id` BIGINT NOT NULL COMMENT 'branch transaction id', `xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id', `context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization', `rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info', `log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status', `log_created` DATETIME(6) NOT NULL COMMENT 'create datetime', `log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime', UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`) ) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
seata-server端 需要新建数据库seata(这里名字要跟seata-server application.yml配置文件中store的db保持一致) 并创建下面4张表
MySQL为:
-- -------------------------------- The script used when storeMode is 'db' -------------------------------- -- the table to store GlobalSession data CREATE TABLE IF NOT EXISTS `global_table` ( `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `status` TINYINT NOT NULL, `application_id` VARCHAR(32), `transaction_service_group` VARCHAR(32), `transaction_name` VARCHAR(128), `timeout` INT, `begin_time` BIGINT, `application_data` VARCHAR(2000), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`xid`), KEY `idx_status_gmt_modified` (`status` , `gmt_modified`), KEY `idx_transaction_id` (`transaction_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; -- the table to store BranchSession data CREATE TABLE IF NOT EXISTS `branch_table` ( `branch_id` BIGINT NOT NULL, `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `resource_group_id` VARCHAR(32), `resource_id` VARCHAR(256), `branch_type` VARCHAR(8), `status` TINYINT, `client_id` VARCHAR(64), `application_data` VARCHAR(2000), `gmt_create` DATETIME(6), `gmt_modified` DATETIME(6), PRIMARY KEY (`branch_id`), KEY `idx_xid` (`xid`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; -- the table to store lock data CREATE TABLE IF NOT EXISTS `lock_table` ( `row_key` VARCHAR(128) NOT NULL, `xid` VARCHAR(128), `transaction_id` BIGINT, `branch_id` BIGINT NOT NULL, `resource_id` VARCHAR(256), `table_name` VARCHAR(32), `pk` VARCHAR(36), `status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking', `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`row_key`), KEY `idx_status` (`status`), KEY `idx_branch_id` (`branch_id`), KEY `idx_xid_and_branch_id` (`xid` , `branch_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; CREATE TABLE IF NOT EXISTS `distributed_lock` ( `lock_key` CHAR(20) NOT NULL, `lock_value` VARCHAR(20) NOT NULL, `expire` BIGINT, primary key (`lock_key`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0); INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0); INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0); INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
下载镜像:
docker pull seataio/seata-server:1.5.2
docker run --name seata-server -p 8091:8091 -p 7091:7091 -d seataio/seata-server:1.5.2
将容器内部resources文件夹拷贝出来并修改:
mkdir /home/seata/resources -p docker cp seata-server:/seata-server/resources /home/seata/ cd /home/seata/resources vim application.yml
参考application.example.yml修改application.yml的 config,registry,store具体如下:
seata: config: # support: nacos, consul, apollo, zk, etcd3 type: nacos nacos: server-addr: 192.168.233.128:8848 namespace: group: SEATA_GROUP username: nacos password: nacos registry: # support: nacos, eureka, redis, zk, consul, etcd3, sofa type: nacos nacos: application: seata-server server-addr: 192.168.233.128:8848 group: SEATA_GROUP namespace: cluster: default username: nacos password: nacos store: # support: file 、 db 、 redis mode: db db: datasource: druid db-type: mysql driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true user: root password: root min-conn: 5 max-conn: 100 global-table: global_table branch-table: branch_table lock-table: lock_table distributed-lock-table: distributed_lock query-limit: 100 max-wait: 5000
重新挂载配置文件,需要先移除容器再使用挂在配置启动容器:
docker stop seata-server docker rm seata-server docker run --name seata-server -p 8091:8091 -p 7091:7091 -v /home/seata/resources:/seata-server/resources -d seataio/seata-server:1.5.2
查看seata启动控制台 docker logs seata-server
至此,nacos上就会注册seata服务
业务端使用seata控制全局事务
依赖
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> <exclusions> <exclusion> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>1.5.2</version> </dependency>
所有微服务配置文件需要添加
seata: enabled: true tx-service-group: imooc_tx_group service: vgroup-mapping: imooc_tx_group: SEATA_GROUP grouplist: SEATA_GROUP: 192.168.233.128:8091 config: nacos: server-addr: 192.168.233.128:8848 username: nacos password: nacos registry: nacos: server-addr: 192.168.233.128:8848 username: nacos password: nacos
调用方serviceImpl方法上添加 @GlobalTransactional,并且一旦远程调用失败,则手动回滚全局事务
// 发起远程调用,初始化用户简历,新增一条空记录 GraceJSONResult graceJSONResult = workMicroServiceFeign.init(user.getId()); if (graceJSONResult.getStatus() != 200) { // 如果调用状态不是200,则手动回滚全局事务 String xid = RootContext.getXID(); if (StringUtils.isNotBlank(xid)) { try { GlobalTransactionContext.reload(xid).rollback(); } catch (TransactionException e) { e.printStackTrace(); } finally { GraceException.display(ResponseStatusEnum.USER_REGISTER_ERROR); } } }
或者使用切面来控制全局事务
package com.imooc.api; import io.seata.core.context.RootContext; import io.seata.core.exception.TransactionException; import io.seata.tm.api.GlobalTransaction; import io.seata.tm.api.GlobalTransactionContext; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.AfterThrowing; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; import org.springframework.stereotype.Component; @Slf4j @Component @Aspect public class SeataTransactionAspect { /** * 调用service之前,手动加入或者创建全局事务 * @param joinPoint * @throws TransactionException */ @Before("execution(* com.imooc.service.impl..*.*(..))") public void beginTransaction(JoinPoint joinPoint) throws TransactionException { log.info("手动开启全局事务"); // 手动开启全局事务 GlobalTransaction gt = GlobalTransactionContext.getCurrentOrCreate(); gt.begin(); } /** * 捕获异常,则手动回滚全局事务 * @param throwable * @throws Throwable */ @AfterThrowing( throwing = "throwable", pointcut = "execution(* com.imooc.service.impl..*.*(..))" ) public void seataRollback(Throwable throwable) throws Throwable { log.info("捕获到异常信息,则回滚,异常信息为:" + throwable.getMessage()); // 从当前线程获得xid String xid = RootContext.getXID(); if (StringUtils.isNotBlank(xid)) { GlobalTransactionContext.reload(xid).rollback(); } } }