全网首发:Seata Saga状态机设计器实战(下)

简介: 全网首发:Seata Saga状态机设计器实战(下)

三、状态机的使用


总体例子还是参考官网的local-saga项目,在其基础上做了一些补充完善。

16.png


3.1 seata属性配置

seata.enabled=true
seata.tx-service-group=local_saga
seata.service.vgroup-mapping.local_saga=seata-server
seata.service.grouplist.seata-server=127.0.0.1:8091

3.2 状态机json配置

15.png

将设计好的Saga状态机的json文件导出,存放在/resource/statelang/目录下。


3.3 配置状态机

/**
 * @Description 状态机配置
 * @Date 2021/5/12 11:03 上午
 * @Version 1.0
 * @Copyright 2019-2021
 */
@Configuration
public class StateMachineEngineConfig{
    @Autowired
    DataSource dataSource;
    @Bean
    public DbStateMachineConfig dbStateMachineConfig(){
        DbStateMachineConfig stateMachineConfig = new DbStateMachineConfig();
        stateMachineConfig.setDataSource(dataSource);
       // Resource resource = new ClassPathResource("statelang/reduce_inventory_and_balance.json");
        Resource resource = new ClassPathResource("statelang/reduce.json");
        stateMachineConfig.setResources(new Resource[]{resource});
        stateMachineConfig.setEnableAsync(true);
        stateMachineConfig.setThreadPoolExecutor(threadExecutor());
        return stateMachineConfig;
    }
      @Bean
      public ProcessCtrlStateMachineEngine stateMachineEngine(){
          ProcessCtrlStateMachineEngine processCtrlStateMachineEngine = new ProcessCtrlStateMachineEngine();
          processCtrlStateMachineEngine.setStateMachineConfig(dbStateMachineConfig());
          return processCtrlStateMachineEngine;
      }
    @Bean
    public StateMachineEngineHolder stateMachineEngineHolder(){
        StateMachineEngineHolder engineHolder = new StateMachineEngineHolder();
        engineHolder.setStateMachineEngine(stateMachineEngine());
        return engineHolder;
    }
    @Bean
    public ThreadPoolExecutor threadExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(1);
        //配置最大线程数
        executor.setMaxPoolSize(20);
        //配置队列大小
        executor.setQueueCapacity(99999);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("SAGA_ASYNC_EXE_");
        // 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor.getThreadPoolExecutor();
    }
}

3.4 下订单接口

@RestController
@Slf4j
public class OrderController {
    @Autowired
    StateMachineEngine stateMachineEngine;
    /**
     * 用户用这个路径进行访问:
     * http://localhost:8080/create
     * @return
     */
    @GetMapping("/create")
    public String create() {
        log.info("=========开始创建订单============");
        Map<String, Object> startParams = new HashMap<>(3);
        //唯一健
        String businessKey = String.valueOf(System.currentTimeMillis());
        startParams.put("businessKey", businessKey);
        startParams.put("count", 10);
        startParams.put("amount", new BigDecimal("400"));
        //同步执行
        StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduceInventoryAndBalance", null, businessKey, startParams);
        if(ExecutionStatus.SU.equals(inst.getStatus())){
            log.info("创建订单成功,saga transaction execute Succeed. XID: " + inst.getId());
            return "创建订单成功";
        }else{
            log.info("创建订单失败 ,saga transaction execute failed. XID: " + inst.getId());
            return "创建订单失败";
        }
    }
}


3.5 启动项目

14.png

初始化TM,TM注册成功。

初始化RM,RM注册成功


3.6 发起下订单请求

发起请求:http://localhost:8080/create

13.png

12.png


说明:

扣减库存服务InventoryAction和扣减余额服务ReduceBalance都成功,成功创建订单。


3.7 模拟扣减余额失败

在BalanceActionImpl类扣减余额方法中,主动抛出异常。

@Override
@Transactional(rollbackFor = Exception.class)
public boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params) {
    if(params != null && "true".equals(params.get("throwException"))){
        throw new RuntimeException("reduce balance failed");
    }
    if (Math.random() < 0.9999) {
        throw new RuntimeException("模拟随机异常,扣减账户余额失败");
    }
    LOGGER.info("reduce balance succeed, amount: " + amount + ", businessKey:" + businessKey);
    return true;
}


发起请求:http://localhost:8080/create

11.png

9.png

10.png


说明:

1、扣减库存动作执行成功

2、扣减余额失败,开始进行重试,重试次数3次

3、重试结束,扣减余额失败,异常被catch动作捕获,触发补偿触发节点CompensationTrigger的事件。

4、触发compenstate状态补偿事件CompensateReduceBalance和CompensateReduceInventory

5、状态机引擎执行结束,创建订单失败


四、遇到的问题记录


问题一:

io.seata.saga.engine.exception.EngineExecutionException: State[Compensation1] is not exist

状态节点的补偿状态节点不存在。

8.png

1、ServiceTask节点一定配置CompensateState属性,指定状态补偿节点。

2、Compensation节点属性中一定要包含 “Type”: "Compensation"属性


问题二:

Caused by: java.lang.IllegalStateException: No node

7.png

根据异常日志,判定是choice判定节点的配置出现异常。

原因:

没有对Choice节点的发散出去的箭头上的判断添加进行重新配置。

这样的默认条件,状态机引擎无法识别下一个节点。

注意:

Expression中的判断条件正确

Next指定下一个节点的id

Default指定默认的下一个节点的id,值类型是字符串。

6.png


问题三:

Caused by: java.lang.ClassCastException: java.lang.Boolean cannot be cast to java.lang.String

at io.seata.saga.statelang.parser.impl.ChoiceStateParser.parse(ChoiceStateParser.java:55)

5.png

choice节点属性:Default属性要指定下个节点的名称,不能使用boolean类型。

4.png


问题四:

Caused by: java.lang.RuntimeException: ‘Catch’ node[053ac3ac] is not attached on a ‘ServiceTask’ or ‘ScriptTask’


Catch节点一定要放在ServiceTask节点上,这样就会自动和ServiceTask进行关联。而不能采用带箭头线条进行关联。

3.png



问题五:

InventoryActionImpl.reduce执行失败不停重试的问题

模拟InventoryActionImpl.reduce执行失败后,扣减库存动作不停的重试。

@Override
@Transactional(rollbackFor = Exception.class)
public boolean reduce(String businessKey, int count) {
    LOGGER.info("reduce inventory succeed, count: " + count + ", businessKey:" + businessKey);
    if (Math.random() < 0.9999) {
        throw new RuntimeException("模拟随机异常,扣减库存失败");
    }
    return true;
}

2.png


改进:

针对InventoryAction节点,也添加异常捕获。

1.png


问题六:

服务端一直出现:Failed to commit SAGA global[172.30.192.25:8091:138343622889181184], will retry later.

这个主要是由于InventoryAction节点抛出了异常,但是没有捕获处理,导致分支事务的状态上报一直失败。

导致TC一直发起globalstatus commitrestrying。

解决:

1、停止seata server服务

2、rm -fr sessionStore

3、重启服务

4、状态机的任务节点都配置异常捕获

5、重启项目再次发起请求。


五、项目地址


项目地址:https://github.com/StarlightWANLI/local-saga.git


总结


业界公认 Saga 是作为长事务的解决方案,而要使用好Saga熟练使用状态机设计器设计相应业务流程执行的状态机引擎至关重要。

希望通过本文大家能掌握Saga状态机设计器的基本使用。

目录
相关文章
|
存储 NoSQL Nacos
Seata分布式事务实战 2
Seata分布式事务实战
161 0
|
存储 SpringCloudAlibaba Java
Seata分布式事务实战 1
Seata分布式事务实战
128 0
|
7月前
|
Java 数据库连接 API
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
141 0
|
7月前
|
开发框架 Java 数据库连接
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)(下)
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
114 0
|
存储 SQL Java
|
SQL 前端开发 Java
有来实验室|第一篇:Seata1.5.2版本部署和开源全栈商城订单支付业务实战(二)
有来实验室|第一篇:Seata1.5.2版本部署和开源全栈商城订单支付业务实战(二)
|
NoSQL 关系型数据库 MySQL
有来实验室|第一篇:Seata1.5.2版本部署和开源全栈商城订单支付业务实战(一)
有来实验室|第一篇:Seata1.5.2版本部署和开源全栈商城订单支付业务实战(一)
|
SQL JSON Java
Seata分布式事务模式(TA、TCC、XA、SAGA)工作机制
分布式应用有一个比较明显的问题就是,一个业务流程通常需要几个服务来完成,业务的一致性很难保证。为了保障业务一致性,每一步都要在 catch 里去处理前面所有的“回滚”操作,可读性及维护性差,开发效率低下。
468 0
|
安全 容灾 Nacos
Seata中的四种不同的事务模式之一SAGA模式
Saga 模式是 Seata 即将开源的长事务解决方案,由蚂蚁金服主要贡献
266 0
|
存储 SQL SpringCloudAlibaba
十一.SpringCloudAlibaba极简入门-分布式事务实战seata
在单体应用中通常情况下只有一个数据库(单数据源),集成事务是一个非常容易的工作。Spring对事务做了很好的管理,我们只需要通过简单的注解@Transactional就可以完成本地事务管理。 但是在微服务项目中事务的管理变得困难,因为微服务项目往往有很多的数据库组成,如果在一个业务中涉及到了对多个微服务以及多个数据库的写操作(跨多个数据源),那么要如何才能保证多个数据库组件的读写一致呢?即:同时操作两个数据库,数据库A写操作成功过,数据库B写操作失败要怎么样让数据库A的写操作回滚?很显然用本地事务管理是不能实现了。 我们知道,虽然Spring对事务做了很好的管理和封装,但是最终都是调用数据
下一篇
DataWorks