一.Seata简介
Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。在 Seata 开源之前,Seata 对应的内部版本在阿里经济体内部一直扮演着分布式一致性中间件的角色,帮助经济体平稳的度过历年的双 11,对各 BU 业务进行了有力的支撑。经过多年沉淀与积累,商业化产品先后在阿里云、金融云进行售卖。2019.1为了打造更加完善的技术生态和普惠技术成果,Seata 正式宣布对外开源,未来 Seata 将以社区共建的形式帮助其技术更加可靠与完备。
二.Seata安装(Windows安装)
1.下载地址:下载地址 博主这里使用的是v0.9.0
2.下载后需要对对应文件尽心修改
a.file.conf修改
transport { # tcp udt unix-domain-socket type = "TCP" #NIO NATIVE server = "NIO" #enable heartbeat heartbeat = true #thread factory for netty thread-factory { boss-thread-prefix = "NettyBoss" worker-thread-prefix = "NettyServerNIOWorker" server-executor-thread-prefix = "NettyServerBizHandler" share-boss-worker = false client-selector-thread-prefix = "NettyClientSelector" client-selector-thread-size = 1 client-worker-thread-prefix = "NettyClientWorkerThread" # netty boss thread size,will not be used for UDT boss-thread-size = 1 #auto default pin or 8 worker-thread-size = 8 } shutdown { # when destroy server, wait seconds wait = 3 } serialization = "seata" compressor = "none" } service { #vgroup->rgroup vgroup_mapping.my_test_tx_group = "fsp_ex_group" #only support single node default.grouplist = "127.0.0.1:8091" #degrade current not support enableDegrade = false #disable disable = false #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent max.commit.retry.timeout = "-1" max.rollback.retry.timeout = "-1" #若报配置异常则加上下面一句配置即可 disableGlobalTransaction=false } client { async.commit.buffer.limit = 10000 lock { retry.internal = 10 retry.times = 30 } report.retry.count = 5 tm.commit.retry.count = 1 tm.rollback.retry.count = 1 } ## transaction log store store { ## store mode: file、db mode = "db" ## file store file { dir = "sessionStore" # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions max-branch-session-size = 16384 # globe session size , if exceeded throws exceptions max-global-session-size = 512 # file buffer size , if exceeded allocate new buffer file-write-buffer-cache-size = 16384 # when recover batch read size session.reload.read_size = 100 # async, sync flush-disk-mode = async } ## database store db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc. datasource = "dbcp" ## mysql/oracle/h2/oceanbase etc. db-type = "mysql" driver-class-name = "com.mysql.jdbc.Driver" url = "jdbc:mysql://localhost:3306/seata" user = "root" password = "123456" min-conn = 1 max-conn = 3 global.table = "global_table" branch.table = "branch_table" lock-table = "lock_table" query-limit = 100 } } lock { ## the lock store mode: local、remote mode = "remote" local { ## store locks in user's database } remote { ## store locks in the seata's server } } recovery { #schedule committing retry period in milliseconds committing-retry-period = 1000 #schedule asyn committing retry period in milliseconds asyn-committing-retry-period = 1000 #schedule rollbacking retry period in milliseconds rollbacking-retry-period = 1000 #schedule timeout retry period in milliseconds timeout-retry-period = 1000 } transaction { undo.data.validation = true undo.log.serialization = "jackson" undo.log.save.days = 7 #schedule delete expired undo_log in milliseconds undo.log.delete.period = 86400000 undo.log.table = "undo_log" } ## metrics settings metrics { enabled = false registry-type = "compact" # multi exporters use comma divided exporter-list = "prometheus" exporter-prometheus-port = 9898 } support { ## spring spring { # auto proxy the DataSource bean datasource.autoproxy = false } }
b.registry.conf修改
registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "file" nacos { serverAddr = "localhost" namespace = "" cluster = "default" } eureka { serviceUrl = "http://localhost:8761/eureka" application = "default" weight = "1" } redis { serverAddr = "localhost:6379" db = "0" } zk { cluster = "default" serverAddr = "127.0.0.1:2181" session.timeout = 6000 connect.timeout = 2000 } consul { cluster = "default" serverAddr = "127.0.0.1:8500" } etcd3 { cluster = "default" serverAddr = "http://localhost:2379" } sofa { serverAddr = "127.0.0.1:9603" application = "default" region = "DEFAULT_ZONE" datacenter = "DefaultDataCenter" cluster = "default" group = "SEATA_GROUP" addressWaitTime = "3000" } file { name = "file.conf" } } config { # file、nacos 、apollo、zk、consul、etcd3 type = "file" nacos { serverAddr = "localhost" namespace = "" } consul { serverAddr = "127.0.0.1:8500" } apollo { app.id = "seata-server" apollo.meta = "http://192.168.1.204:8801" } zk { serverAddr = "127.0.0.1:2181" session.timeout = 6000 connect.timeout = 2000 } etcd3 { serverAddr = "http://localhost:2379" } file { name = "file.conf" } }
3.创建seata数据库(mysql) 将seata->conf->db_store.sql放到数据库中运行即可
4.到bin目录下找到seata-server.bat运行即可 如下即为成功运行:
三.nacos安装
a.下载地址:https://github.com/alibaba/nacos/releases
b.修改application.properties文件(将数据库对应配置注释打开即可)
c.找到startup.cmd编辑里面的将set MODE="cluster"一行改成set MODE=“standalone”
d.点击startup.cmd运行即可,如下即为成功:
e.访问localhost:8848即可 默认账号密码都是:nacos
四.项目案例
1.项目结构
account-service:负责对用户余扣减
business-service:负责完成下单的逻辑,包含 2 个主要的步骤,就是对库存服务和订单服务的远程调用
order-service:负责完成保存用户订单的操作
storage-service:负责完成对库存的扣减
2.各模块主要代码展示:
account-service
@Service public class AccountServiceImpl implements AccountService { @Autowired private AccountTblMapper accountTblMapper ; private static Logger logger = LoggerFactory.getLogger(AccountServiceImpl.class) ; @Transactional @Override public void debit(String userId, int money) { logger.info("开始扣减用户:{}的余额,数量为:{}",userId ,money); //1 查询数据库里面的账户 AccountTbl accountTbl = accountTblMapper.selectOne(new LambdaQueryWrapper<AccountTbl>(). eq(AccountTbl::getUserId, userId)); if(accountTbl==null){ throw new IllegalArgumentException("此账号不存在") ; } //2 扣减的操作 int idleMoney = accountTbl.getMoney() - money ; // 3 库存的校验 if(idleMoney<0){ throw new RuntimeException("库存不足") ; } //4 设置到账户里面 accountTbl.setMoney(idleMoney); //5 保存到数据库里里面 accountTblMapper.updateById(accountTbl) ; if("SXT_USER_2".equals(userId)){ throw new RuntimeException("不想成功") ; } } }
####Pom文件
<dependencies> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.3.0</version> </dependency> <!--MySQL依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> </dependencies>
business-service
@Service public class BusinessServiceImpl implements BusinessService { private static Logger logger = LoggerFactory.getLogger(BusinessServiceImpl.class) ; @Autowired private StorageServiceApi storageServiceApi ; @Autowired private OrderServiceApi orderServiceApi ; @Override public void purchase(String userId, String productNo, int count) { logger.info("准备下单,用户{},商品{},数量{}",userId ,productNo ,count); // 1 远程调用库存服务--> 库存的扣减f storageServiceApi.deduct(productNo, count); // 2 远程调用订单服务-->订单的新增 orderServiceApi.create(userId, productNo, count); logger.info("下单成功"); } }
####通过feign调用 代码:
OrderServiceApi
@Service public class OrderServiceApi { private static Logger logger = LoggerFactory.getLogger(OrderServiceApi.class) ; // @Autowired // private RestTemplate restTemplate ; @Autowired private OrderServiceFeign orderServiceFeign ; public void create(String userId ,String productNo ,int count){ // ResponseEntity<Void> responseEntity = restTemplate.getForEntity( // "http://order-service/create/{userId}/{productNo}/{count}", // Void.class, // userId, // productNo, // count // ); ResponseEntity<Void> responseEntity = orderServiceFeign.create(userId, productNo, count); if(responseEntity.getStatusCode()== HttpStatus.OK){ logger.info("远程调用订单服务成功"); return; } throw new RuntimeException("远程调用订单服务失败") ; } }
StorageServiceApi
@Service public class StorageServiceApi { private static Logger logger = LoggerFactory.getLogger(StorageServiceApi.class) ; // @Autowired // private RestTemplate restTemplate ; //ribbon @Autowired private StorageServiceFeign storageServiceFeign ; public void deduct(String productNo ,int count){ // ResponseEntity<Void> responseEntity = restTemplate.getForEntity( // "http://storage-service/deduct/{productNo}/{count}", // Void.class, // productNo, // count // ); ResponseEntity<Void> responseEntity = storageServiceFeign.deduct(productNo, count); if(responseEntity.getStatusCode()== HttpStatus.OK){ logger.info("远程调用库存服务成功"); return; } throw new RuntimeException("远程调用库存服务失败"); } }
###order-service
@Service public class OrderServiceImpl implements OrderService { @Autowired private OrderTblMapper orderTblMapper ; @Autowired private AccountServiceApi accountServiceApi ; private static Logger logger = LoggerFactory.getLogger(OrderServiceImpl.class) ; @Transactional @Override public OrderTbl createOrder(String userId, String productNo, int count) { logger.info("开始创建一个订单,用户为{},商品为{},数量为{}",userId,productNo,count); // 1 扣减用户的余额 account-service - >rpc int money = cal(productNo,count) ; ResponseEntity<Void> responseEntity = accountServiceApi.debit(userId, money); if(responseEntity.getStatusCode()!= HttpStatus.OK){ throw new RuntimeException("远程扣减用户的余额失败") ; } //2 写本地的订单表 OrderTbl orderTbl = new OrderTbl(); orderTbl.setUserId(userId); orderTbl.setCommodityCode(productNo); orderTbl.setCount(count); orderTblMapper.insert(orderTbl) ; logger.info("创建订单成功,用户为{},商品为{},数量{}",userId,productNo,count); return orderTbl; } /** * 计算商品的总价格 * @param productNo * @param count * @return */ private int cal(String productNo, int count) { //我们的数据库里面非常简单,没有商品的表 int money = 0 ; if("HUAWEI_0001".equals(productNo)){ money = 2000 ; }else if("XIAOMI_002".equals("productNo")){ money = 1000 ; }else { money = 9999 ; } return money * count ; } }
###storage-service
@Service public class StorageServiceImpl implements StorageService { @Autowired private StorageTblMapper storageTblMapper ; private static Logger logger = LoggerFactory.getLogger(StorageServiceImpl.class) ; @Transactional @Override public void deduct(String productNo, int count) { logger.info("开始扣减商品{}的库存,数量为{}",productNo,count); //1 查询数据库里面该商品的库存 StorageTbl storageTbl = storageTblMapper.selectOne(new LambdaQueryWrapper<StorageTbl>() .eq(StorageTbl::getCommodityCode, productNo)); if(storageTbl==null){ throw new IllegalArgumentException("商品不存在") ; } // 2 扣减操作 int idleCount = storageTbl.getCount() - count ; if(idleCount< 0 ){ throw new RuntimeException("库存不足") ; } //3 设置到商品的库存里面 storageTbl.setCount(idleCount); // 4 保存到数据库里面 storageTblMapper.updateById(storageTbl) ; logger.info("扣减商品{}的库存成功,剩余的库存为{}" ,productNo, idleCount); } }
五.测试(其中的数据表在项目下seata.sql 放到之前安装seata时创建的seata数据库运行即可)
1.启动四个服务(若有改动后重启只需重启order-service ,business-service)
2.还原数据库数据
3.正常测试:
4.异常回滚测试: