springcloud+eureka整合阿里seata-saga模式

简介: springcloud+eureka整合阿里seata-saga模式

分布式事务saga实现的理论基础Hector&Kenneth在1987年发表的论文Sagas,它的核心思想是当整个事务的一个节点失败后,依赖于状态对当前事务从前到后进行重试,或者从后往前进行补偿。


saga模式的主要应用场景是业务流程比较长,有一些服务不能提供TCC模式的三个接口,或者不能实现AT模式的依赖undolog实现自动补偿。


阿里的seata中间件是通过状态机来实现的,它使用状态图定义服务调用流程并生成json状态语言定义文件,状态图的节点可以是一个服务,也可以是补偿节点。这个生成的json由状态机引擎来驱动执行,出现异常是状态机引擎对调用成功的服务从后往前补偿,而补偿的逻辑需要由服务自己来实现。


本文我们还是用之前TCC模式中的例子,我们在电商网站购买一件商品,后台首先会从订单服务下单,然后订单服务会调用账户服务扣减商品金额,如果成功,再调用库存服务扣减库存。如果其中某一步失败,则从后往前依次补偿,这个补偿事件由状态机触发。


配置状态机


首先我们需要创建3张表,sql语句如下,注意,本文使用的mysql:

create table seata_state_machine_def
(
    id               varchar(32)  not null comment 'id',
    name             varchar(128) not null comment 'name',
    tenant_id        varchar(32)  not null comment 'tenant id',
    app_name         varchar(32)  not null comment 'application name',
    type             varchar(20) comment 'state language type',
    comment_         varchar(255) comment 'comment',
    ver              varchar(16)  not null comment 'version',
    gmt_create       timestamp(3)    not null comment 'create time',
    status           varchar(2)   not null comment 'status(AC:active|IN:inactive)',
    content          longtext comment 'content',
    recover_strategy varchar(16) comment 'transaction recover strategy(compensate|retry)',
    primary key (id)
);
CREATE TABLE seata_state_machine_inst
(
    id                  VARCHAR(128) NOT NULL COMMENT 'id',
    machine_id          VARCHAR(32) NOT NULL COMMENT 'state machine definition id',
    tenant_id           VARCHAR(32) NOT NULL COMMENT 'tenant id',
    parent_id           VARCHAR(128) COMMENT 'parent id',
    gmt_started         TIMESTAMP(3)   NOT NULL COMMENT 'start time',
    business_key        VARCHAR(48) COMMENT 'business key',
    start_params        LONGTEXT COMMENT 'start parameters',
    gmt_end             TIMESTAMP(3) COMMENT 'end time',
    excep               BLOB COMMENT 'exception',
    end_params          LONGTEXT COMMENT 'end parameters',
    STATUS              VARCHAR(2) COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
    compensation_status VARCHAR(2) COMMENT 'compensation status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
    is_running          TINYINT(1) COMMENT 'is running(0 no|1 yes)',
    gmt_updated         TIMESTAMP(3)   NOT NULL,
    PRIMARY KEY (id),
    UNIQUE KEY unikey_buz_tenant (business_key, tenant_id)
);
CREATE TABLE seata_state_inst
(
    id                       VARCHAR(48)  NOT NULL COMMENT 'id',
    machine_inst_id          VARCHAR(128)  NOT NULL COMMENT 'state machine instance id',
    NAME                     VARCHAR(128) NOT NULL COMMENT 'state name',
    TYPE                     VARCHAR(20) COMMENT 'state type',
    service_name             VARCHAR(128) COMMENT 'service name',
    service_method           VARCHAR(128) COMMENT 'method name',
    service_type             VARCHAR(16) COMMENT 'service type',
    business_key             VARCHAR(48) COMMENT 'business key',
    state_id_compensated_for VARCHAR(50) COMMENT 'state compensated for',
    state_id_retried_for     VARCHAR(50) COMMENT 'state retried for',
    gmt_started              TIMESTAMP(3)    NOT NULL COMMENT 'start time',
    is_for_update            TINYINT(1) COMMENT 'is service for update',
    input_params             LONGTEXT COMMENT 'input parameters',
    output_params            LONGTEXT COMMENT 'output parameters',
    STATUS                   VARCHAR(2)   NOT NULL COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
    excep                    BLOB COMMENT 'exception',
    gmt_end                  TIMESTAMP(3) COMMENT 'end time',
    PRIMARY KEY (id, machine_inst_id)
);

本文电商购物的流程状态,我们用下图表示:

微信图片_20221212153235.png

注意:seata提供了下面地址可以绘制这个图,同时生成对应的json代码。本文的json代码是参考官方示例手工改写的。


http://seata.io/saga_designer/index.html#/

状态机需要依赖一个json,这个json定义了上面流程图中的节点,代码如下:

{
    "Name": "buyGoodsOnline",
    "Comment": "buy a goods on line, add order, deduct account, deduct storage ",
    "StartState": "SaveOrder",
    "Version": "0.0.1",
    "States": {
        "SaveOrder": {
            "Type": "ServiceTask",
            "ServiceName": "orderSave",
            "ServiceMethod": "saveOrder",
            "CompensateState": "DeleteOrder",
            "Next": "ChoiceAccountState",
            "Input": [
                "$.[businessKey]",
                "$.[order]"
            ],
            "Output": {
                "SaveOrderResult": "$.#root"
            },
            "Status": {
                "#root == true": "SU",
                "#root == false": "FA",
                "$Exception{java.lang.Throwable}": "UN"
            }
        },
        "ChoiceAccountState":{
            "Type": "Choice",
            "Choices":[
                {
                    "Expression":"[SaveOrderResult] == true",
                    "Next":"ReduceAccount"
                }
            ],
            "Default":"Fail"
        },
        "ReduceAccount": {
            "Type": "ServiceTask",
            "ServiceName": "accountService",
            "ServiceMethod": "decrease",
            "CompensateState": "CompensateReduceAccount",
            "Next": "ChoiceStorageState",
            "Input": [
                "$.[businessKey]",
                "$.[userId]",
                "$.[money]",
                {
                    "throwException" : "$.[mockReduceAccountFail]"
                }
            ],
            "Output": {
                "ReduceAccountResult": "$.#root"
            },
            "Status": {
                "#root == true": "SU",
                "#root == false": "FA",
                "$Exception{java.lang.Throwable}": "UN"
            },
            "Catch": [
                {
                    "Exceptions": [
                        "java.lang.Throwable"
                    ],
                    "Next": "CompensationTrigger"
                }
            ]
        },
        "ChoiceStorageState":{
            "Type": "Choice",
            "Choices":[
                {
                    "Expression":"[ReduceAccountResult] == true",
                    "Next":"ReduceStorage"
                }
            ],
            "Default":"Fail"
        },
        "ReduceStorage": {
            "Type": "ServiceTask",
            "ServiceName": "storageService",
            "ServiceMethod": "decrease",
            "CompensateState": "CompensateReduceStorage",
            "Input": [
                "$.[businessKey]",
                "$.[productId]",
                "$.[count]",
                {
                    "throwException" : "$.[mockReduceStorageFail]"
                }
            ],
            "Output": {
                "ReduceStorageResult": "$.#root"
            },
            "Status": {
                "#root == true": "SU",
                "#root == false": "FA",
                "$Exception{java.lang.Throwable}": "UN"
            },
            "Catch": [
                {
                    "Exceptions": [
                        "java.lang.Throwable"
                    ],
                    "Next": "CompensationTrigger"
                }
            ],
            "Next": "Succeed"
        },
        "DeleteOrder": {
            "Type": "ServiceTask",
            "ServiceName": "orderSave",
            "ServiceMethod": "deleteOrder",
            "Input": [
                "$.[businessKey]",
                "$.[order]"
            ]
        },
        "CompensateReduceAccount": {
            "Type": "ServiceTask",
            "ServiceName": "accountService",
            "ServiceMethod": "compensateDecrease",
            "Input": [
                "$.[businessKey]",
                "$.[userId]",
                "$.[money]"
            ]
        },
        "CompensateReduceStorage": {
            "Type": "ServiceTask",
            "ServiceName": "storageService",
            "ServiceMethod": "compensateDecrease",
            "Input": [
                "$.[businessKey]",
                "$.[productId]",
                "$.[count]"
            ]
        },
        "CompensationTrigger": {
            "Type": "CompensationTrigger",
            "Next": "Fail"
        },
        "Succeed": {
            "Type":"Succeed"
        },
        "Fail": {
            "Type":"Fail",
            "ErrorCode": "PURCHASE_FAILED",
            "Message": "purchase failed"
        }
    }
}

上面的json中,我们定义了6个ServiceTask,分别对应订单服务保存订单、账户服务扣减账户和库存服务扣减库存以及对应的补偿机制。


我们定义了CompensationTrigger,并且在账户服务和库存服务抛出异常后,会调用CompensationTrigger来触发补偿事件。


对于每一个节点,我们需要定义type,同时对于ServiceTask类型的节点,我们需要定义触发方法,回滚事件对应的ServiceTask,下一个流程节点,输入/输出参数、异常等。


订单服务


订单服务是本文中的集成服务,它会调用账户服务和库存服务来实现业务。


首先,前端发起下单请求后,订单服务会接收这个服务,然后启动状态机,这个代码在OrderServiceImpl,代码如下:

public boolean create(Order order) {
    LOGGER.info("------->交易开始");
    StateMachineEngine stateMachineEngine = (StateMachineEngine) ApplicationContextUtils.getApplicationContext().getBean("stateMachineEngine");
    Map<String, Object> startParams = new HashMap<>(3);
    String businessKey = String.valueOf(System.currentTimeMillis());
    startParams.put("businessKey", businessKey);
    startParams.put("order", order);
    startParams.put("mockReduceAccountFail", "true");
    startParams.put("userId", order.getUserId());
    startParams.put("money", order.getPayAmount());
    startParams.put("productId", order.getProductId());
    startParams.put("count", order.getCount());
    //这里状态机是同步的,seata也支持异步,可以参考官方示例
    StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("buyGoodsOnline", null, businessKey, startParams);
    Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transaction execute failed. XID: " + inst.getId());
    System.out.println("saga transaction commit succeed. XID: " + inst.getId());
    inst = stateMachineEngine.getStateMachineConfig().getStateLogStore().getStateMachineInstanceByBusinessKey(businessKey, null);
    Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transaction execute failed. XID: " + inst.getId());
    return true;
}

保存账单OrderSaveImpl对应json里面的orderSave,里面的回滚方法就是deleteOrder,代码如下:

public class OrderSaveImpl implements OrderApi{
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Resource
    private OrderDao orderDao;
    @Override
    public boolean saveOrder(String businessKey, Order order) {
        logger.info("保存订单, businessKey:{}, order: {}", businessKey, order);
        orderDao.create(order);
        return true;
    }
    /**
     * 回滚事务,删除订单
     * @param order order
     * @return
     */
    @Override
    public boolean deleteOrder(String businessKey,Order order){
        logger.info("删除订单, businessKey:{}, order: {}", businessKey, order);
        orderDao.delete(order);
        return true;
    }
}

定义调用账户服务,这里使用feign调用,补偿方法是compensateDecrease,对应json文件中的accountService,代码如下:

public class AccountServiceImpl implements AccountService{
    @Resource
    private AccountApi accountApi;
    @Override
    public boolean decrease(String businessKey, Long userId, BigDecimal money) {
        return accountApi.decrease(businessKey, userId, money);
    }
    @Override
    public boolean compensateDecrease(String businessKey, Long userId, BigDecimal money) {
        return accountApi.compensateDecrease(businessKey, userId, money);
    }
}

定义调用库存服务,这里使用feign调用,补偿方法是compensateDecrease,对应json文件中的storageService,代码如下:

public class StorageServiceImpl implements StorageService{
    @Resource
    private StorageApi storageApi;
    @Override
    public boolean decrease(String businessKey, Long productId, Integer count) {
        return storageApi.decrease(businessKey, productId, count);
    }
    @Override
    public boolean compensateDecrease(String businessKey, Long productId, Integer count) {
        return storageApi.compensateDecrease(businessKey, productId, count);
    }
}

库存服务


看了订单服务代码,再看库存服务就非常简单了,为订单服务提供http接口,包括2个方法,扣减库存和补偿扣减库存,controller调用的service代码如下:

@Service("storageService")
public class StorageServiceImpl implements StorageService {
    private static final Logger LOGGER = LoggerFactory.getLogger(StorageServiceImpl.class);
    @Resource
    private StorageDao storageDao;
    @Override
    public boolean decrease(Long productId, Integer count) {
        LOGGER.info("扣减库存, commit, productId:{}, count:{}", productId, count);
        storageDao.decrease(productId, count);
        //throw new RuntimeException();
    }
    @Override
    public boolean compensateDecrease(Long productId, Integer count) {
        LOGGER.info("补偿扣减库存, compensate, productId:{}, count:{}", productId, count);
        storageDao.compensateDecrease(productId, count);
        return true;
    }
}

账户服务的代码跟这个类似,就不再贴代码了。


测试


本文的实验中,订单、账户、库存服务都有自己的数据库,这里不再贴sql了,需要了解的可以看我之前的文章《springcloud+eureka整合分布式事务中间件seata》,或者下载源代码,文末有源码地址。


开始实验之前,订单表没有数据,账户和库存数据如下:

account表:

微信图片_20221212153523.png

storage表:

微信图片_20221212153543.png

我们进行一次成功的实验,向下面的url发送购买商品请求:

http://localhost:8180/order/create
{
  "userId":1,
  "productId":1,
  "count":1,
  "money":1,
  "payAmount":50
}

成功之后发现账户表有了数据,账户表和库存表数据如下:

order表:

微信图片_20221212153619.png

account表:

微信图片_20221212153638.png

storage表:

微信图片_20221212153700.png

我们把库存服务的decrease方法改成如下:

public boolean decrease(Long productId, Integer count) {
    LOGGER.info("扣减库存, commit, productId:{}, count:{}", productId, count);
  storageDao.decrease(productId, count);
    throw new RuntimeException();
}

这时发送购买商品请求后会抛出异常,然后3个服务的事务依次做交易补偿,所有表数据没有变。


总结


seata中的saga模式适用于长流程或者长事务的场景。而saga模式复杂的地方在于引入了状态机,需要定义状态机的流程,把定义好的流程用json文件引入工程中。


同时saga模式需要开发者自己定义回滚事件,如果回滚失败,对整个事务的控制就非常复杂了。

相关文章
|
8月前
Seata框架在AT模式下是如何保证数据一致性的?
通过以上这些机制的协同作用,Seata 在 AT 模式下能够有效地保证数据的一致性,确保分布式事务的可靠执行。你还可以进一步深入研究 Seata 的具体实现细节,以更好地理解其数据一致性保障的原理。
282 50
|
6月前
|
Java 关系型数据库 数据库
微服务SpringCloud分布式事务之Seata
SpringCloud+SpringCloudAlibaba的Seata实现分布式事务,步骤超详细,附带视频教程
426 1
|
7月前
|
数据库 微服务
SEATA模式
Seata 是一款开源的分布式事务解决方案,支持多种事务模式以适应不同的应用场景。其主要模式包括:AT(TCC)模式,事务分三阶段执行;TCC 模式,提供更灵活的事务控制;SAGA 模式,基于状态机实现跨服务的事务一致性;XA 模式,采用传统两阶段提交协议确保数据一致性。
177 5
|
10月前
|
SQL NoSQL 数据库
SpringCloud基础6——分布式事务,Seata
分布式事务、ACID原则、CAP定理、Seata、Seata的四种分布式方案:XA、AT、TCC、SAGA模式
SpringCloud基础6——分布式事务,Seata
|
10月前
|
负载均衡 Java Nacos
SpringCloud基础1——远程调用、Eureka,Nacos注册中心、Ribbon负载均衡
微服务介绍、SpringCloud、服务拆分和远程调用、Eureka注册中心、Ribbon负载均衡、Nacos注册中心
SpringCloud基础1——远程调用、Eureka,Nacos注册中心、Ribbon负载均衡
|
11月前
|
人工智能 前端开发 Java
【实操】Spring Cloud Alibaba AI,阿里AI这不得玩一下(含前后端源码)
本文介绍了如何使用 **Spring Cloud Alibaba AI** 构建基于 Spring Boot 和 uni-app 的聊天机器人应用。主要内容包括:Spring Cloud Alibaba AI 的概念与功能,使用前的准备工作(如 JDK 17+、Spring Boot 3.0+ 及通义 API-KEY),详细实操步骤(涵盖前后端开发工具、组件选择、功能分析及关键代码示例)。最终展示了如何成功实现具备基本聊天功能的 AI 应用,帮助读者快速搭建智能聊天系统并探索更多高级功能。
3520 2
【实操】Spring Cloud Alibaba AI,阿里AI这不得玩一下(含前后端源码)
|
10月前
|
人工智能 前端开发 Java
Spring Cloud Alibaba AI,阿里AI这不得玩一下
🏀闪亮主角: 大家好,我是JavaDog程序狗。今天分享Spring Cloud Alibaba AI,基于Spring AI并提供阿里云通义大模型的Java AI应用。本狗用SpringBoot+uniapp+uview2对接Spring Cloud Alibaba AI,带你打造聊天小AI。 📘故事背景: 🎁获取源码: 关注公众号“JavaDog程序狗”,发送“alibaba-ai”即可获取源码。 🎯主要目标:
296 0
|
9月前
|
负载均衡 算法 Nacos
SpringCloud 微服务nacos和eureka
SpringCloud 微服务nacos和eureka
161 0
|
11月前
|
缓存 Java Maven
SpringCloud基于Eureka的服务治理架构搭建与测试:从服务提供者到消费者的完整流程
Spring Cloud微服务框架中的Eureka是一个用于服务发现和注册的基础组件,它基于RESTful风格,为微服务架构提供了关键的服务注册与发现功能。以下是对Eureka的详细解析和搭建举例。
161 0
|
8月前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。