在分布式系统架构成为互联网应用标配的今天,我们始终绕不开一个核心矛盾:数据一致性与系统可用性的平衡。CAP定理明确告诉我们,在分区容错性(Partition Tolerance)必须满足的分布式环境下,我们无法同时实现强一致性(Consistency)和高可用性(Availability)。而传统基于ACID模型的刚性事务,在跨节点、跨服务的分布式场景下,会出现性能急剧下降、可用性大幅降低的问题,根本无法支撑高并发、大流量的互联网业务。
正是在这样的背景下,BASE理论应运而生。它不是对ACID的否定,而是在分布式场景下对事务模型的补充与优化,通过牺牲强一致性换取系统的高可用性与横向扩展能力,最终通过最终一致性保障数据的正确性。本文将从底层逻辑出发,彻底拆解BASE理论的核心概念,详解最终一致性的实现原理,结合可落地的柔性事务设计方案与代码实例,帮你彻底吃透BASE理论,解决分布式事务的核心痛点。
一、BASE理论的核心定义与底层逻辑
BASE理论由eBay架构师Dan Pritchett于2008年在ACM发表的《BASE: An Acid Alternative》论文中正式提出,是大规模分布式系统架构设计的核心理论之一,至今仍是互联网分布式事务设计的核心指导思想。
BASE是三个核心短语的缩写:Basically Available(基本可用)、Soft State(软状态)、Eventually Consistent(最终一致性)。接下来我们逐一拆解每个核心概念的底层逻辑与落地边界。
1.1 Basically Available(基本可用)
基本可用的核心定义是:当分布式系统出现不可预知的故障时,允许系统损失部分非核心功能的可用性,保障核心业务链路的正常运行,而非整个系统完全不可用。
这里要明确两个核心误区:
- 基本可用≠系统不可用:核心业务链路必须保持可用,只是允许非核心功能降级。
- 基本可用≠性能无底线:允许响应时间适当延长,但必须在用户可接受的范围内。
举个通俗的例子:电商大促期间,系统流量峰值达到平时的10倍,此时我们可以关闭商品评价、个性化推荐、积分商城等非核心接口,将所有服务器资源集中到下单、支付、库存扣减等核心链路上,保障用户能正常完成购物流程,这就是典型的基本可用的落地。
基本可用的核心实现手段包括:服务降级、流量控制、熔断隔离、过载保护、分级响应等,其核心目标是在极端场景下,保住系统的核心生命线。
1.2 Soft State(软状态)
软状态也被称为中间状态,核心定义是:允许系统中的数据存在中间状态,且该中间状态不会影响系统的整体可用性,允许数据在多个节点副本之间的同步存在延迟。
与之相对的是ACID模型中的硬状态:ACID事务要求数据在任何时刻都必须处于一致的状态,事务执行过程中的中间状态对外不可见,要么全成功,要么全回滚,不存在中间过渡状态。
而软状态的核心价值,就是打破了硬状态对性能的限制,允许系统在异步处理过程中存在短暂的数据不一致,从而实现系统的高可用与高并发。
举个例子:用户下单支付完成后,订单状态更新为“已支付”,但物流系统的发货状态、积分系统的积分到账状态,不需要和订单状态实时同步,可以处于“待处理”的中间状态,通过异步任务在几秒到几分钟内完成处理,这个“待处理”的状态就是软状态。
这里必须明确软状态的落地边界:
- 软状态必须有明确的生命周期:不能无限期处于中间状态,必须设置超时时间与兜底处理机制。
- 软状态不能影响核心业务的正确性:中间状态必须是可追溯、可回滚、可补偿的,不能出现数据丢失或错乱。
- 软状态必须对用户透明:用户不需要感知中间状态的存在,或者对中间状态有明确的预期,不能给用户造成业务异常的误解。
1.3 Eventually Consistent(最终一致性)
最终一致性是BASE理论的核心与最终目标,其核心定义是:系统中所有的数据副本,在经过一段时间的同步与处理后,最终能够达到一个完全一致的状态。
这里要纠正一个最常见的错误认知:最终一致性不是“数据最终随便一致”,更不是“放弃数据一致性”,而是放弃了“实时强一致性”,通过异步处理、重试补偿等机制,保障数据在最终时刻的正确性。
在分布式系统中,最终一致性的实现,本质上是通过时间换空间、换性能、换可用性,将实时的强一致性校验,转化为异步的最终一致性保障,从而大幅提升系统的并发能力与可用性。
1.3.1 最终一致性的五大核心模型
最终一致性不是一个模糊的概念,而是有明确的、可落地的一致性模型,由分布式系统领域的权威专家Douglas Terry在《Replicated Data Management for Mobile Computing》论文中提出,也是目前业界公认的最终一致性标准模型,我们从易到难逐一讲解:
① 单调读(Monotonic Reads)定义:如果一个用户从系统中读取到了某个数据的某个版本,那么后续该用户的所有读操作,都不会读取到比该版本更旧的数据。 通俗解释:用户看到订单状态是“已支付”,后续刷新页面,不会再看到“待支付”的状态。 这是最终一致性最基础的要求,也是用户体验的底线。
② 单调写(Monotonic Writes)定义:同一个用户发起的写操作,系统必须保证按照发起的顺序依次执行。 通俗解释:用户先修改收货地址,再修改订单金额,系统不会先执行修改金额,再执行修改地址,避免出现数据错乱。 单调写是分布式系统数据正确性的基础保障。
③ 读己之所写(Read Your Own Writes)定义:用户写入数据成功后,后续的读操作,一定能读取到自己刚刚写入的数据。 通俗解释:用户修改了昵称,提交成功后刷新页面,一定能看到新的昵称,而不是旧的昵称。 这是比单调读更高的一致性要求,也是绝大多数业务场景必须满足的底线。
④ 会话一致性(Session Consistency)定义:在用户的同一个会话周期内,系统保证满足“读己之所写”的一致性模型;会话结束后,最终一致性的时间窗口可以适当放宽。 通俗解释:用户在同一个APP登录会话中,自己的操作一定能立刻看到结果;退出登录后,其他用户看到该用户的数据,允许有短暂的延迟。 会话一致性是工业界最常用的最终一致性模型,平衡了用户体验与系统性能。
⑤ 因果一致性(Causal Consistency)定义:系统中存在因果关系的写操作,必须保证所有节点都按照因果顺序执行;没有因果关系的写操作,顺序可以不做强制要求。 通俗解释:用户先下单,再支付,再申请退款,这三个操作有明确的因果关系,所有节点必须按照这个顺序执行;而两个不同用户的下单操作,没有因果关系,执行顺序不做要求。 因果一致性是最终一致性中最强的模型,几乎接近强一致性,同时保留了分布式系统的性能与可用性优势。
1.3.2 最终一致性的时间窗口
最终一致性的核心参数,就是“不一致的时间窗口”,也就是从数据写入成功,到所有节点数据完全一致的时间间隔。 这个时间窗口的长短,取决于业务场景、系统架构、网络环境等多个因素,常见的时间窗口:
- 金融核心场景:毫秒级到秒级
- 电商交易场景:秒级到分钟级
- 非核心业务场景:分钟级到小时级 时间窗口越短,对系统的性能要求越高,可用性越低;时间窗口越长,系统性能越好,可用性越高,但用户体验与数据风险也会相应提升。落地时必须根据业务场景,找到最佳的平衡点。
二、BASE与ACID的核心区别与易混淆点辨析
很多开发者会误以为BASE是ACID的对立面,其实不然:ACID是传统关系型数据库中刚性事务的核心模型,追求的是强一致性;而BASE是分布式场景下柔性事务的核心模型,追求的是最终一致性,二者是互补关系,而非对立关系。
我们通过一个清晰的对比表格,明确二者的核心区别:
| 对比维度 | ACID模型 | BASE模型 |
| 一致性模型 | 强一致性,事务执行的任何时刻,数据都处于一致状态 | 最终一致性,允许短暂的中间状态,最终数据达到一致 |
| 可用性要求 | 牺牲可用性保证强一致性,故障时事务直接回滚,服务不可用 | 牺牲强一致性保证基本可用,故障时核心链路仍可正常运行 |
| 事务特性 | 原子性、一致性、隔离性、持久性,事务是一个不可分割的整体 | 基本可用、软状态、最终一致性,允许事务分阶段异步执行 |
| 适用场景 | 单机/集中式系统,金融核心、账务等对数据一致性要求极高的场景 | 分布式系统,高并发、大流量的互联网业务,如电商、社交、内容平台 |
| 性能表现 | 并发性能低,跨节点事务性能急剧下降,无法支撑高并发场景 | 并发性能极高,支持横向扩展,可支撑百万级QPS的高并发场景 |
| 故障处理 | 故障时直接回滚,事务原子性保障,无中间状态 | 故障时通过重试、补偿、对账等机制兜底,保障最终一致性 |
接下来,我们辨析几个最容易混淆的核心概念,避免落地时出现原则性错误:
2.1 最终一致性≠弱一致性
弱一致性的定义是:数据写入成功后,系统不承诺后续的读操作能读取到最新的值,也不承诺多久之后数据能达到一致。 而最终一致性是弱一致性的一个特例,它在弱一致性的基础上,增加了一个明确的承诺:在经过一个可预期的时间窗口后,数据一定能达到一致的状态。
举个例子:微博的点赞数,用户点赞后,其他用户可能需要几秒后才能看到最新的点赞数,这是最终一致性;而如果系统不承诺点赞数最终会同步到所有节点,可能出现部分用户永远看不到最新的点赞数,这就是弱一致性。 工业界几乎不会使用纯弱一致性模型,绝大多数场景使用的都是最终一致性模型。
2.2 柔性事务≠分布式事务
分布式事务是一个广义的概念,指的是跨多个节点、多个服务、多个数据库的事务,包括刚性分布式事务(如2PC、3PC)和柔性分布式事务(基于BASE理论)。 而柔性事务是分布式事务的一个子集,特指基于BASE理论,牺牲强一致性换取可用性与性能,通过最终一致性保障数据正确性的分布式事务方案。 简单来说:柔性事务一定是分布式事务,但分布式事务不一定是柔性事务。
2.3 软状态≠事务未提交
ACID事务中的未提交状态,是事务执行过程中的中间状态,对外完全不可见,事务回滚后,所有数据都会恢复到之前的状态; 而BASE中的软状态,是对外可见的中间状态,是事务执行过程中的一个合法状态,即使系统出现故障,软状态也会被持久化,后续会通过异步处理推进到最终状态,不会随意回滚。 这是二者最核心的本质区别,很多开发者落地时踩坑,就是把软状态当成了未提交的临时状态,导致故障时出现数据丢失。
三、基于BASE理论的柔性事务设计与落地实现
柔性事务是BASE理论的核心落地载体,也是解决分布式事务痛点的核心方案。接下来,我们将详解工业界最常用的4种柔性事务设计模式,结合完整的可运行代码实例,帮你掌握从理论到落地的全流程。
3.1 可靠消息最终一致性方案
可靠消息最终一致性方案是工业界最常用的柔性事务方案,适用于绝大多数异步化的分布式事务场景,核心思想是:通过可靠的消息中间件,将分布式事务拆分为多个本地事务,通过消息的可靠投递与消费,保障最终一致性。
该方案有两种主流的实现方式:本地消息表方案、事务消息方案,我们先从最基础、最通用的本地消息表方案讲起。
3.1.1 本地消息表方案
本地消息表方案的核心原理是“本地事务+消息异步投递”,将分布式事务拆分为两个阶段: 第一阶段:业务操作与消息写入本地消息表,在同一个本地事务中执行,保证要么都成功,要么都失败,从根源上避免“业务执行了,消息没发出去”或者“消息发出去了,业务没执行”的问题。 第二阶段:通过异步任务,将本地消息表中待发送的消息投递到消息中间件,消费端接收到消息后,执行下游业务逻辑,完成分布式事务的闭环,保障最终一致性。
该方案的核心流程图如下:
该方案的核心优势:
- 实现简单,不依赖特殊的中间件,通用性极强,任何关系型数据库都可实现
- 性能极高,本地事务的执行效率接近单机事务,可支撑高并发场景
- 可观测性强,所有消息都持久化到本地表,可追溯、可排查、可手动补偿
- 稳定性高,不依赖分布式协调组件,故障概率极低
适用场景:绝大多数异步化的分布式事务场景,如订单创建后异步扣减库存、异步发放积分、异步通知物流、异步数据同步等。
接下来,我们给出完整的可运行代码实例,基于JDK17、Spring Boot 3.2.4、MyBatis Plus 3.5.6、RocketMQ 2.2.3、MySQL 8.0开发。
第一步:数据库表创建(MySQL 8.0)
CREATE TABLE `t_order` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单ID',
`order_no` varchar(64) NOT NULL COMMENT '订单编号',
`user_id` bigint NOT NULL COMMENT '用户ID',
`product_id` bigint NOT NULL COMMENT '商品ID',
`quantity` int NOT NULL COMMENT '购买数量',
`order_amount` decimal(10,2) NOT NULL COMMENT '订单金额',
`order_status` tinyint NOT NULL DEFAULT '0' COMMENT '订单状态:0-待支付,1-已支付,2-已取消',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_order_no` (`order_no`),
KEY `idx_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='订单表';
CREATE TABLE `t_local_message` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '消息ID',
`message_id` varchar(64) NOT NULL COMMENT '消息唯一标识',
`message_body` text NOT NULL COMMENT '消息体JSON',
`message_topic` varchar(64) NOT NULL COMMENT '消息主题',
`message_status` tinyint NOT NULL DEFAULT '0' COMMENT '消息状态:0-待发送,1-已发送,2-处理完成,3-处理失败',
`retry_count` int NOT NULL DEFAULT '0' COMMENT '重试次数',
`max_retry_count` int NOT NULL DEFAULT '10' COMMENT '最大重试次数',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_message_id` (`message_id`),
KEY `idx_status_create_time` (`message_status`,`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='本地消息表';
CREATE TABLE `t_product_stock` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '库存ID',
`product_id` bigint NOT NULL COMMENT '商品ID',
`stock_num` int NOT NULL DEFAULT '0' COMMENT '可用库存',
`frozen_stock_num` int NOT NULL DEFAULT '0' COMMENT '冻结库存',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_product_id` (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='商品库存表';
第二步:Maven pom.xml 依赖配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.4</version>
<relativePath/>
</parent>
<groupId>com.jam.demo</groupId>
<artifactId>base-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>base-demo</name>
<description>BASE理论柔性事务demo</description>
<properties>
<java.version>17</java.version>
<mybatis-plus.version>3.5.6</mybatis-plus.version>
<rocketmq.version>2.2.3</rocketmq.version>
<fastjson2.version>2.0.49</fastjson2.version>
<guava.version>33.1.0-jre</guava.version>
<springdoc.version>2.5.0</springdoc.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>${springdoc.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
第三步:核心实体类定义
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;
/**
* 订单实体类
* @author ken
*/
@Data
@TableName("t_order")
@Schema(description = "订单实体")
public class Order {
@TableId(type = IdType.AUTO)
@Schema(description = "订单ID")
private Long id;
@Schema(description = "订单编号")
private String orderNo;
@Schema(description = "用户ID")
private Long userId;
@Schema(description = "商品ID")
private Long productId;
@Schema(description = "购买数量")
private Integer quantity;
@Schema(description = "订单金额")
private BigDecimal orderAmount;
@Schema(description = "订单状态:0-待支付,1-已支付,2-已取消")
private Integer orderStatus;
@Schema(description = "创建时间")
private LocalDateTime createTime;
@Schema(description = "更新时间")
private LocalDateTime updateTime;
}
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 本地消息实体类
* @author ken
*/
@Data
@TableName("t_local_message")
@Schema(description = "本地消息实体")
public class LocalMessage {
@TableId(type = IdType.AUTO)
@Schema(description = "消息ID")
private Long id;
@Schema(description = "消息唯一标识")
private String messageId;
@Schema(description = "消息体JSON")
private String messageBody;
@Schema(description = "消息主题")
private String messageTopic;
@Schema(description = "消息状态:0-待发送,1-已发送,2-处理完成,3-处理失败")
private Integer messageStatus;
@Schema(description = "重试次数")
private Integer retryCount;
@Schema(description = "最大重试次数")
private Integer maxRetryCount;
@Schema(description = "创建时间")
private LocalDateTime createTime;
@Schema(description = "更新时间")
private LocalDateTime updateTime;
}
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 商品库存实体类
* @author ken
*/
@Data
@TableName("t_product_stock")
@Schema(description = "商品库存实体")
public class ProductStock {
@TableId(type = IdType.AUTO)
@Schema(description = "库存ID")
private Long id;
@Schema(description = "商品ID")
private Long productId;
@Schema(description = "可用库存")
private Integer stockNum;
@Schema(description = "冻结库存")
private Integer frozenStockNum;
@Schema(description = "创建时间")
private LocalDateTime createTime;
@Schema(description = "更新时间")
private LocalDateTime updateTime;
}
第四步:Mapper接口定义
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.Order;
import org.apache.ibatis.annotations.Mapper;
/**
* 订单Mapper接口
* @author ken
*/
@Mapper
public interface OrderMapper extends BaseMapper<Order> {
}
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.LocalMessage;
import org.apache.ibatis.annotations.Mapper;
/**
* 本地消息Mapper接口
* @author ken
*/
@Mapper
public interface LocalMessageMapper extends BaseMapper<LocalMessage> {
}
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.ProductStock;
import org.apache.ibatis.annotations.Mapper;
/**
* 商品库存Mapper接口
* @author ken
*/
@Mapper
public interface ProductStockMapper extends BaseMapper<ProductStock> {
}
第五步:核心业务Service实现,采用编程式事务
package com.jam.demo.service;
import com.alibaba.fastjson2.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.jam.demo.entity.LocalMessage;
import com.jam.demo.entity.Order;
import com.jam.demo.mapper.LocalMessageMapper;
import com.jam.demo.mapper.OrderMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.UUID;
/**
* 订单服务类
* @author ken
*/
@Slf4j
@Service
public class OrderService {
private final OrderMapper orderMapper;
private final LocalMessageMapper localMessageMapper;
private final TransactionTemplate transactionTemplate;
private final RocketMQTemplate rocketMQTemplate;
public OrderService(OrderMapper orderMapper, LocalMessageMapper localMessageMapper,
TransactionTemplate transactionTemplate, RocketMQTemplate rocketMQTemplate) {
this.orderMapper = orderMapper;
this.localMessageMapper = localMessageMapper;
this.transactionTemplate = transactionTemplate;
this.rocketMQTemplate = rocketMQTemplate;
}
/**
* 创建订单
* @param order 订单信息
* @return 订单编号
*/
public String createOrder(Order order) {
if (ObjectUtils.isEmpty(order)) {
throw new IllegalArgumentException("订单信息不能为空");
}
if (!StringUtils.hasText(order.getOrderNo())) {
order.setOrderNo(UUID.randomUUID().toString().replace("-", ""));
}
if (ObjectUtils.isEmpty(order.getUserId())) {
throw new IllegalArgumentException("用户ID不能为空");
}
if (ObjectUtils.isEmpty(order.getProductId())) {
throw new IllegalArgumentException("商品ID不能为空");
}
if (ObjectUtils.isEmpty(order.getQuantity()) || order.getQuantity() <= 0) {
throw new IllegalArgumentException("购买数量必须大于0");
}
if (ObjectUtils.isEmpty(order.getOrderAmount()) || order.getOrderAmount().signum() <= 0) {
throw new IllegalArgumentException("订单金额必须大于0");
}
order.setOrderStatus(0);
String messageId = UUID.randomUUID().toString().replace("-", "");
String topic = "order_create_topic";
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
try {
orderMapper.insert(order);
log.info("订单创建成功,订单编号:{}", order.getOrderNo());
LocalMessage localMessage = new LocalMessage();
localMessage.setMessageId(messageId);
localMessage.setMessageBody(JSON.toJSONString(order));
localMessage.setMessageTopic(topic);
localMessage.setMessageStatus(0);
localMessage.setRetryCount(0);
localMessage.setMaxRetryCount(10);
localMessageMapper.insert(localMessage);
log.info("本地消息插入成功,消息ID:{}", messageId);
} catch (Exception e) {
log.error("订单创建事务执行失败,回滚", e);
status.setRollbackOnly();
throw new RuntimeException("订单创建失败", e);
}
}
});
try {
rocketMQTemplate.convertAndSend(topic, JSON.toJSONString(order));
LocalMessage updateMessage = new LocalMessage();
updateMessage.setMessageStatus(1);
localMessageMapper.update(updateMessage,
new LambdaQueryWrapper<LocalMessage>().eq(LocalMessage::getMessageId, messageId));
log.info("消息投递成功,消息ID:{}", messageId);
} catch (Exception e) {
log.error("消息投递失败,等待定时任务重试,消息ID:{}", messageId, e);
}
return order.getOrderNo();
}
}
第六步:消息投递定时任务
package com.jam.demo.task;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.jam.demo.entity.LocalMessage;
import com.jam.demo.mapper.LocalMessageMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* 本地消息投递定时任务
* @author ken
*/
@Slf4j
@Component
public class LocalMessageSendTask {
private final LocalMessageMapper localMessageMapper;
private final RocketMQTemplate rocketMQTemplate;
public LocalMessageSendTask(LocalMessageMapper localMessageMapper, RocketMQTemplate rocketMQTemplate) {
this.localMessageMapper = localMessageMapper;
this.rocketMQTemplate = rocketMQTemplate;
}
@Scheduled(fixedRate = 10000)
public void sendPendingMessage() {
List<LocalMessage> pendingMessageList = localMessageMapper.selectList(
new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper<LocalMessage>()
.eq(LocalMessage::getMessageStatus, 0)
.lt(LocalMessage::getRetryCount, LocalMessage::getMaxRetryCount)
.last("limit 100")
);
if (CollectionUtils.isEmpty(pendingMessageList)) {
return;
}
for (LocalMessage message : pendingMessageList) {
try {
rocketMQTemplate.convertAndSend(message.getMessageTopic(), message.getMessageBody());
localMessageMapper.update(
new LambdaUpdateWrapper<LocalMessage>()
.set(LocalMessage::getMessageStatus, 1)
.set(LocalMessage::getRetryCount, message.getRetryCount() + 1)
.eq(LocalMessage::getId, message.getId())
);
log.info("定时任务投递消息成功,消息ID:{}", message.getMessageId());
} catch (Exception e) {
localMessageMapper.update(
new LambdaUpdateWrapper<LocalMessage>()
.set(LocalMessage::getRetryCount, message.getRetryCount() + 1)
.eq(LocalMessage::getId, message.getId())
);
log.error("定时任务投递消息失败,消息ID:{},重试次数:{}",
message.getMessageId(), message.getRetryCount() + 1, e);
}
}
}
@Scheduled(fixedRate = 60000)
public void handleFailMessage() {
List<LocalMessage> failMessageList = localMessageMapper.selectList(
new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper<LocalMessage>()
.eq(LocalMessage::getMessageStatus, 0)
.ge(LocalMessage::getRetryCount, LocalMessage::getMaxRetryCount)
);
if (CollectionUtils.isEmpty(failMessageList)) {
return;
}
for (LocalMessage message : failMessageList) {
localMessageMapper.update(
new LambdaUpdateWrapper<LocalMessage>()
.set(LocalMessage::getMessageStatus, 3)
.eq(LocalMessage::getId, message.getId())
);
log.error("消息超过最大重试次数,标记为失败,消息ID:{}", message.getMessageId());
}
}
}
第七步:MQ消费端,实现幂等性处理
package com.jam.demo.mq;
import com.alibaba.fastjson2.JSON;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.jam.demo.entity.Order;
import com.jam.demo.entity.ProductStock;
import com.jam.demo.mapper.ProductStockMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.concurrent.TimeUnit;
/**
* 订单创建消息消费者
* @author ken
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "order_create_topic", consumerGroup = "order_create_consumer_group")
public class OrderCreateMessageConsumer implements RocketMQListener<String> {
private final ProductStockMapper productStockMapper;
private final StringRedisTemplate stringRedisTemplate;
private static final String IDEMPOTENT_KEY_PREFIX = "order:stock:idempotent:";
private static final long IDEMPOTENT_KEY_EXPIRE_DAYS = 7;
public OrderCreateMessageConsumer(ProductStockMapper productStockMapper, StringRedisTemplate stringRedisTemplate) {
this.productStockMapper = productStockMapper;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public void onMessage(String message) {
log.info("接收到订单创建消息,消息内容:{}", message);
Order order = JSON.parseObject(message, Order.class);
if (ObjectUtils.isEmpty(order)) {
log.error("消息解析失败,消息内容:{}", message);
return;
}
String orderNo = order.getOrderNo();
String idempotentKey = IDEMPOTENT_KEY_PREFIX + orderNo;
Boolean isFirstProcess = stringRedisTemplate.opsForValue()
.setIfAbsent(idempotentKey, "1", IDEMPOTENT_KEY_EXPIRE_DAYS, TimeUnit.DAYS);
if (Boolean.FALSE.equals(isFirstProcess)) {
log.info("消息重复处理,直接跳过,订单编号:{}", orderNo);
return;
}
try {
int updateRow = productStockMapper.update(
new LambdaUpdateWrapper<ProductStock>()
.setSql("stock_num = stock_num - " + order.getQuantity())
.setSql("frozen_stock_num = frozen_stock_num + " + order.getQuantity())
.eq(ProductStock::getProductId, order.getProductId())
.ge(ProductStock::getStockNum, order.getQuantity())
);
if (updateRow <= 0) {
log.error("库存扣减失败,库存不足,订单编号:{},商品ID:{}", orderNo, order.getProductId());
throw new RuntimeException("库存不足,扣减失败");
}
log.info("库存扣减成功,订单编号:{},商品ID:{},扣减数量:{}",
orderNo, order.getProductId(), order.getQuantity());
} catch (Exception e) {
stringRedisTemplate.delete(idempotentKey);
log.error("订单消息处理失败,订单编号:{}", orderNo, e);
throw new RuntimeException("消息处理失败", e);
}
}
}
第八步:Controller层,添加swagger3注解
package com.jam.demo.controller;
import com.jam.demo.entity.Order;
import com.jam.demo.service.OrderService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 订单控制器
* @author ken
*/
@RestController
@RequestMapping("/order")
@Tag(name = "订单管理", description = "订单相关接口")
public class OrderController {
private final OrderService orderService;
public OrderController(OrderService orderService) {
this.orderService = orderService;
}
@PostMapping("/create")
@Operation(summary = "创建订单", description = "创建订单并异步扣减库存,基于本地消息表实现最终一致性")
public ResponseEntity<String> createOrder(@RequestBody Order order) {
String orderNo = orderService.createOrder(order);
return ResponseEntity.ok(orderNo);
}
}
第九步:启动类与配置文件
启动类:
package com.jam.demo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* 启动类
* @author ken
*/
@SpringBootApplication
@MapperScan("com.jam.demo.mapper")
@EnableScheduling
public class BaseDemoApplication {
public static void main(String[] args) {
SpringApplication.run(BaseDemoApplication.class, args);
}
}
application.yml配置文件:
spring:
application:
name: base-demo
datasource:
url: jdbc:mysql://localhost:3306/base_demo?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username: root
password: your_password
driver-class-name: com.mysql.cj.jdbc.Driver
data:
redis:
host: localhost
port: 6379
password: your_redis_password
database: 0
server:
port: 8080
mybatis-plus:
configuration:
map-underscore-to-camel-case: true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
global-config:
db-config:
id-type: auto
rocketmq:
name-server: localhost:9876
producer:
group: base-demo-producer-group
springdoc:
swagger-ui:
path: /swagger-ui.html
api-docs:
path: /v3/api-docs
3.1.2 事务消息方案
事务消息方案是对本地消息表方案的优化,将本地消息表的逻辑放到了消息中间件中实现,避免了本地消息表与业务表的耦合,主流的RocketMQ、Kafka都支持事务消息功能。 其核心原理是通过2PC的方式,保障消息的发送与本地事务的执行要么都成功,要么都失败,核心流程如下:
- 生产者向MQ发送半消息(Half Message),半消息对消费者不可见
- MQ响应半消息发送成功
- 生产者执行本地事务
- 生产者根据本地事务执行结果,向MQ发送Commit或Rollback请求
- MQ收到Commit请求后,将半消息标记为可投递,消费者可正常消费;收到Rollback请求后,删除半消息
- 若MQ长时间未收到Commit或Rollback请求,会回查生产者的本地事务状态,根据回查结果决定Commit或Rollback
该方案的核心优势是消除了本地消息表与业务表的耦合,架构更简洁,性能更高;缺点是强依赖消息中间件的事务消息功能,通用性不如本地消息表方案。
3.2 TCC(Try-Confirm-Cancel)模式
TCC模式是一种同步的柔性事务方案,适用于对一致性要求较高、需要同步返回结果的分布式事务场景,核心思想是将分布式事务拆分为Try、Confirm、Cancel三个阶段,由业务代码实现这三个阶段的逻辑,通过TCC协调器统一调度,保障最终一致性。
三个阶段的核心定义:
- Try阶段:资源预留与检查,完成所有业务的合法性校验,预留必要的业务资源,锁定资源避免被其他事务修改
- Confirm阶段:确认提交,执行真正的业务操作,使用Try阶段预留的资源,该阶段必须保证幂等性,且不允许执行失败
- Cancel阶段:取消回滚,释放Try阶段预留的资源,回滚业务操作,该阶段也必须保证幂等性,且不允许执行失败
TCC模式的核心流程图如下:
TCC模式的核心优势:
- 同步执行,可实时返回事务执行结果,适合需要同步响应的业务场景
- 性能高,不依赖数据库的本地事务,可实现跨数据库、跨服务的事务
- 灵活性强,业务逻辑完全由开发者控制,可适配各种复杂的业务场景
缺点:
- 代码侵入性强,每个业务都需要实现Try、Confirm、Cancel三个接口,开发成本高
- 开发难度大,需要处理幂等性、空回滚、悬挂等各种异常场景
- 依赖TCC协调器,存在单点故障风险
适用场景:对一致性要求较高的同步分布式事务场景,如支付转账、订单支付、跨库的核心业务操作等。
3.3 SAGA模式
SAGA模式适用于长事务场景,核心思想是将长事务拆分为多个本地短事务,由SAGA协调器按照顺序依次执行,如果所有本地事务都执行成功,则事务提交成功;如果某个本地事务执行失败,则按照相反的顺序执行补偿事务,回滚已经执行的操作,保障最终一致性。
SAGA模式有两种实现方式:
- 正向执行+反向补偿:正常情况下按照顺序执行正向事务,出现异常时执行反向补偿事务,回滚之前的操作
- 状态机模式:将每个事务步骤定义为状态机的一个节点,通过状态机的流转控制事务的执行与回滚,可实现更复杂的事务控制
SAGA模式的核心优势:
- 适合长事务场景,不会长时间锁定资源,并发性能高
- 代码侵入性比TCC低,只需要实现正向操作和补偿操作
- 无锁设计,不会出现死锁问题
缺点:
- 只能保证最终一致性,无法实现隔离性,可能出现脏读、不可重复读的问题
- 补偿操作的开发难度大,需要保证补偿操作的幂等性与正确性
- 复杂场景下的事务状态管理难度大
适用场景:长事务场景,如订单履约流程、物流配送流程、跨多个服务的长链路业务操作等。
3.4 最大努力通知方案
最大努力通知方案是最终一致性要求最低的柔性事务方案,核心思想是:发起通知方尽最大的努力将业务处理结果通知给接收方,但是不保证通知一定能送达,接收方需要主动查询业务结果,保障最终一致性。
该方案的核心实现逻辑:
- 业务处理完成后,发起方按照一定的策略(如1分钟、5分钟、10分钟、30分钟)重复发送通知,直到接收方成功响应,达到最大通知次数后停止发送
- 接收方提供校验接口,可主动查询业务的最终结果,处理漏通知的情况
- 接收方处理通知时,必须保证幂等性,避免重复处理
适用场景:对一致性要求较低的通知类场景,如短信通知、邮件通知、微信/支付宝支付结果回调、物流状态通知等。
四、柔性事务落地的核心保障与避坑指南
基于BASE理论的柔性事务,核心目标是保障最终一致性,而最终一致性的落地,有几个必须实现的核心保障,也是绝大多数开发者踩坑的重灾区,我们逐一讲解。
4.1 幂等性:最终一致性的基石
幂等性的定义是:同一个操作,无论执行多少次,得到的结果都是一样的,不会因为多次执行而出现数据错乱。 在柔性事务中,重试机制是标配,而重试必然会导致消息或接口被多次调用,如果没有实现幂等性,就会出现重复扣减库存、重复发放积分、重复生成订单等严重的业务问题。
工业界主流的幂等性实现方案:
- 唯一ID+去重表:为每个操作生成唯一的业务ID,在执行业务前先查询去重表,若该ID已经处理过,直接返回成功;否则执行业务,并将ID插入去重表,与业务操作在同一个本地事务中执行。该方案可靠性最高,适合核心业务场景。
- Redis setNX:为每个操作生成唯一的业务ID,执行业务前先通过setNX设置key,若设置成功,说明是第一次执行,执行业务;否则直接返回成功。该方案性能最高,适合高并发场景。
- 乐观锁:通过版本号机制实现,更新数据时校验版本号,只有版本号匹配时才更新成功,避免重复更新。该方案适合更新操作的幂等性实现。
- 状态机幂等:通过业务状态的流转控制,只有当业务处于指定状态时,才允许执行操作,否则直接拒绝。该方案适合有明确状态流转的业务场景,如订单状态管理。
4.2 重试机制:最终一致性的核心保障
重试机制是柔性事务中处理异常的核心手段,但是不合理的重试会导致系统雪崩,必须遵循以下核心原则:
- 有限重试:必须设置最大重试次数,不能无限重试,避免异常持续累积,导致系统资源耗尽。
- 退避策略:重试间隔必须采用指数退避策略,如第一次重试间隔10秒,第二次20秒,第三次40秒,直到达到最大间隔,避免集中重试导致下游服务被打垮。
- 异常分类:必须区分可重试异常和不可重试异常,只有临时的、可恢复的异常(如网络超时、数据库连接失败)才允许重试;业务异常(如参数错误、库存不足)不允许重试,直接标记为失败,人工处理。
- 死信队列:超过最大重试次数的消息,必须放入死信队列,接入告警通知,人工处理,不能直接丢弃,避免数据不一致。
4.3 隔离性保障:柔性事务的短板补齐
柔性事务基于BASE理论,牺牲了强一致性,同时也放弃了ACID的隔离性,可能出现脏读、不可重复读、幻读的问题,必须通过业务手段补齐隔离性,常见的实现方案:
- 资源预留:在Try阶段锁定业务资源,避免其他事务修改,如TCC模式的资源预留。
- 串行化控制:通过分布式锁,将同一个业务资源的操作串行化执行,避免并发修改。
- 版本号控制:为业务数据添加版本号,更新时校验版本号,避免并发修改导致的数据错乱。
- 状态机控制:通过业务状态的流转,限制数据的修改权限,只有处于指定状态的数据才能被修改。
4.4 兜底补偿:最终一致性的最后一道防线
无论重试机制多么完善,总会出现无法自动恢复的异常情况,必须有兜底补偿机制,保障最终一致性,常见的兜底方案:
- 定时对账:每天凌晨定时执行对账任务,对比上下游系统的数据,找出不一致的数据,自动执行补偿操作,或生成对账异常工单,人工处理。
- 人工干预:对于超过最大重试次数、无法自动恢复的异常,必须接入告警通知,人工介入处理,修复数据,保障最终一致性。
- 补偿接口:为每个业务操作提供对应的补偿接口,支持手动触发补偿操作,处理异常情况。
五、BASE理论落地的场景选择与最佳实践
BASE理论不是银弹,不是所有场景都适合使用,落地时必须根据业务场景,选择合适的事务模型,我们给出明确的场景选择标准与最佳实践。
5.1 场景选择标准
- 必须使用ACID强一致性事务的场景:
- 金融核心账务系统,如账户余额管理、资金划转、会计记账等
- 与钱相关的核心交易场景,如支付结算、对账清算等
- 对数据一致性要求极高,不允许出现任何数据不一致的场景
- 适合使用BASE柔性事务的场景:
- 高并发、大流量的互联网业务场景,如电商订单、库存管理、社交互动、内容平台等
- 跨服务、跨数据库的分布式事务场景,可接受短暂的数据不一致
- 对系统可用性、并发性能要求极高,可牺牲强一致性换取可用性的场景
- 业务链路长,可异步化处理的场景
5.2 最佳实践
- 能不用分布式事务就不用:优先通过业务设计,避免分布式事务的产生,如将相关的业务数据放到同一个数据库中,使用本地ACID事务,这是最高效、最可靠的方案。
- 优先选择可靠消息最终一致性方案:绝大多数异步场景,优先使用可靠消息最终一致性方案,实现简单、性能高、稳定性强,是工业界的首选方案。
- 同步场景优先选择TCC模式:对于需要同步返回结果、一致性要求较高的场景,优先选择TCC模式,避免使用2PC等刚性分布式事务方案,性能更高,可用性更强。
- 长事务场景优先选择SAGA模式:对于链路长、步骤多的长事务场景,优先选择SAGA模式,避免长时间锁定资源,提升系统并发性能。
- 通知类场景优先选择最大努力通知方案:对于通知类、非核心的业务场景,优先选择最大努力通知方案,实现简单,性能高。
- 幂等性必须前置:所有的写操作、所有的柔性事务接口,必须先实现幂等性,再开发业务逻辑,这是不可突破的底线。
- 可观测性必须完善:所有的柔性事务操作,必须有完整的日志、链路追踪、监控告警,可追溯、可排查、可监控,出现异常时能第一时间发现并处理。
- 必须有兜底机制:所有的柔性事务方案,必须设计兜底补偿机制,即使所有的自动处理都失败了,也能通过人工干预保障数据的最终一致性。
总结
BASE理论的核心,不是放弃一致性,而是在分布式系统的核心矛盾中,找到一致性、可用性、性能三者的最佳平衡点。它告诉我们,在大规模分布式场景下,强一致性不是唯一的选择,通过最终一致性,我们可以实现系统的高可用、高并发、高扩展,同时保障业务数据的最终正确性。
柔性事务是BASE理论的落地载体,无论是可靠消息最终一致性、TCC、SAGA还是最大努力通知,核心目标都是保障最终一致性。落地时,我们不需要追求最复杂的方案,而是要根据业务场景,选择最简单、最可靠、最适合的方案,同时守住幂等性、重试机制、兜底补偿这三个核心底线,才能真正解决分布式事务的痛点,实现系统的稳定运行。