三、状态机的使用
总体例子还是参考官网的local-saga项目,在其基础上做了一些补充完善。
3.1 seata属性配置
seata.enabled=true seata.tx-service-group=local_saga seata.service.vgroup-mapping.local_saga=seata-server seata.service.grouplist.seata-server=127.0.0.1:8091
3.2 状态机json配置
将设计好的Saga状态机的json文件导出,存放在/resource/statelang/目录下。
3.3 配置状态机
/** * @Description 状态机配置 * @Date 2021/5/12 11:03 上午 * @Version 1.0 * @Copyright 2019-2021 */ @Configuration public class StateMachineEngineConfig{ @Autowired DataSource dataSource; @Bean public DbStateMachineConfig dbStateMachineConfig(){ DbStateMachineConfig stateMachineConfig = new DbStateMachineConfig(); stateMachineConfig.setDataSource(dataSource); // Resource resource = new ClassPathResource("statelang/reduce_inventory_and_balance.json"); Resource resource = new ClassPathResource("statelang/reduce.json"); stateMachineConfig.setResources(new Resource[]{resource}); stateMachineConfig.setEnableAsync(true); stateMachineConfig.setThreadPoolExecutor(threadExecutor()); return stateMachineConfig; } @Bean public ProcessCtrlStateMachineEngine stateMachineEngine(){ ProcessCtrlStateMachineEngine processCtrlStateMachineEngine = new ProcessCtrlStateMachineEngine(); processCtrlStateMachineEngine.setStateMachineConfig(dbStateMachineConfig()); return processCtrlStateMachineEngine; } @Bean public StateMachineEngineHolder stateMachineEngineHolder(){ StateMachineEngineHolder engineHolder = new StateMachineEngineHolder(); engineHolder.setStateMachineEngine(stateMachineEngine()); return engineHolder; } @Bean public ThreadPoolExecutor threadExecutor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(1); //配置最大线程数 executor.setMaxPoolSize(20); //配置队列大小 executor.setQueueCapacity(99999); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix("SAGA_ASYNC_EXE_"); // 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); return executor.getThreadPoolExecutor(); } }
3.4 下订单接口
@RestController @Slf4j public class OrderController { @Autowired StateMachineEngine stateMachineEngine; /** * 用户用这个路径进行访问: * http://localhost:8080/create * @return */ @GetMapping("/create") public String create() { log.info("=========开始创建订单============"); Map<String, Object> startParams = new HashMap<>(3); //唯一健 String businessKey = String.valueOf(System.currentTimeMillis()); startParams.put("businessKey", businessKey); startParams.put("count", 10); startParams.put("amount", new BigDecimal("400")); //同步执行 StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduceInventoryAndBalance", null, businessKey, startParams); if(ExecutionStatus.SU.equals(inst.getStatus())){ log.info("创建订单成功,saga transaction execute Succeed. XID: " + inst.getId()); return "创建订单成功"; }else{ log.info("创建订单失败 ,saga transaction execute failed. XID: " + inst.getId()); return "创建订单失败"; } } }
3.5 启动项目
初始化TM,TM注册成功。
初始化RM,RM注册成功
3.6 发起下订单请求
发起请求:http://localhost:8080/create
说明:
扣减库存服务InventoryAction和扣减余额服务ReduceBalance都成功,成功创建订单。
3.7 模拟扣减余额失败
在BalanceActionImpl类扣减余额方法中,主动抛出异常。
@Override @Transactional(rollbackFor = Exception.class) public boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params) { if(params != null && "true".equals(params.get("throwException"))){ throw new RuntimeException("reduce balance failed"); } if (Math.random() < 0.9999) { throw new RuntimeException("模拟随机异常,扣减账户余额失败"); } LOGGER.info("reduce balance succeed, amount: " + amount + ", businessKey:" + businessKey); return true; }
发起请求:http://localhost:8080/create
说明:
1、扣减库存动作执行成功
2、扣减余额失败,开始进行重试,重试次数3次
3、重试结束,扣减余额失败,异常被catch动作捕获,触发补偿触发节点CompensationTrigger的事件。
4、触发compenstate状态补偿事件CompensateReduceBalance和CompensateReduceInventory
5、状态机引擎执行结束,创建订单失败
四、遇到的问题记录
问题一:
io.seata.saga.engine.exception.EngineExecutionException: State[Compensation1] is not exist
状态节点的补偿状态节点不存在。
1、ServiceTask节点一定配置CompensateState属性,指定状态补偿节点。
2、Compensation节点属性中一定要包含 “Type”: "Compensation"属性
问题二:
Caused by: java.lang.IllegalStateException: No node
根据异常日志,判定是choice判定节点的配置出现异常。
原因:
没有对Choice节点的发散出去的箭头上的判断添加进行重新配置。
这样的默认条件,状态机引擎无法识别下一个节点。
注意:
Expression中的判断条件正确
Next指定下一个节点的id
Default指定默认的下一个节点的id,值类型是字符串。
问题三:
Caused by: java.lang.ClassCastException: java.lang.Boolean cannot be cast to java.lang.String
at io.seata.saga.statelang.parser.impl.ChoiceStateParser.parse(ChoiceStateParser.java:55)
choice节点属性:Default属性要指定下个节点的名称,不能使用boolean类型。
问题四:
Caused by: java.lang.RuntimeException: ‘Catch’ node[053ac3ac] is not attached on a ‘ServiceTask’ or ‘ScriptTask’
Catch节点一定要放在ServiceTask节点上,这样就会自动和ServiceTask进行关联。而不能采用带箭头线条进行关联。
问题五:
InventoryActionImpl.reduce执行失败不停重试的问题
模拟InventoryActionImpl.reduce执行失败后,扣减库存动作不停的重试。
@Override @Transactional(rollbackFor = Exception.class) public boolean reduce(String businessKey, int count) { LOGGER.info("reduce inventory succeed, count: " + count + ", businessKey:" + businessKey); if (Math.random() < 0.9999) { throw new RuntimeException("模拟随机异常,扣减库存失败"); } return true; }
改进:
针对InventoryAction节点,也添加异常捕获。
问题六:
服务端一直出现:Failed to commit SAGA global[172.30.192.25:8091:138343622889181184], will retry later.
这个主要是由于InventoryAction节点抛出了异常,但是没有捕获处理,导致分支事务的状态上报一直失败。
导致TC一直发起globalstatus commitrestrying。
解决:
1、停止seata server服务
2、rm -fr sessionStore
3、重启服务
4、状态机的任务节点都配置异常捕获
5、重启项目再次发起请求。
五、项目地址
项目地址:https://github.com/StarlightWANLI/local-saga.git
总结
业界公认 Saga 是作为长事务的解决方案,而要使用好Saga熟练使用状态机设计器设计相应业务流程执行的状态机引擎至关重要。
希望通过本文大家能掌握Saga状态机设计器的基本使用。