分布式事务中间件对⽐与选择
- tx-lcn
- EasyTransaction
- ByteTCC
- Seata
Seata实现分布式事务
我们主要以Seata的分布式事务框架进行介绍分析,相关的并且针对于其三种模式进行分别说明介绍。
搭建Seata Server
前往github.com/seata/seata… 下载Seata安装包,本书使⽤Seata 1.0.0。将⽬录切换⾄Seata根⽬录,根据操作系统,执⾏对应命令,即可启动Seata Server。
Linux/Unix/Mac
sh ./bin/seata-server.sh 复制代码
Windows
bin\seata-server.bat 复制代码
启动时,也可指定参数
$ sh ./bin/seata-server.sh -p 8091 -h 127.0.0.1 -m file 复制代码
⽀持的参数如下表所示
Seata AT模式
- ⼀阶段:业务数据和回滚⽇志记录在同⼀个本地事务中提交,释放本地锁和连接资源。
- ⼆阶段:提交异步化,⾮常快速地完成。回滚通过⼀阶段的回滚⽇志进⾏反向补偿。
官⽅⽂档
代码演示
Maven依赖
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency> 复制代码
配置
seata: tx-service-group: content-center-seata-service-group service: vgroup-mapping: content-center-seata-service-group: seata-cluster grouplist: seata-cluster: 127.0.0.1:8091 disable-global-transaction: false 复制代码
配置说明
- tx-service-group:事务分组,默认是 ${spring.application.name}-seata-service-group ,唯⼀即可。
- vgroup-mapping:事务分组映射,表示 tx-service-group 对应到哪个Seata Server集群。
- key是tx-service-group的值,value是集群名称,唯⼀即可
- grouplist:集群中所包含的Seata Server的地址列表,key是vgroup-mapping中value的值,
- value是Seata Server的地址列表
- disable-global-transaction:是否开启全局事务开关,默认false。
在Seata1.0.0中,该配置⽆法正常读取,这是⼀个Bug,详⻅ github.com/seata/seata… ,好在,该配置的默认值就是false,所以不影响使⽤。
创建Seata事务记录表
-- auto-generated definition create table undo_log ( id bigint auto_increment comment 'increment id' primary key, branch_id bigint not null comment 'branch transaction id', xid varchar(100) not null comment 'global transaction id', context varchar(128) not null comment 'undo_log context,such as serialization', rollback_info longblob not null comment 'rollback info', log_status int not null comment '0:normal status,1:defense status', log_created datetime not null comment 'create datetime', log_modified datetime not null comment 'modify datetime', constraint ux_undo_log unique (xid, branch_id) ) comment 'AT transaction mode undo table' charset = utf8; 复制代码
Controller代码:
private final ShareSeataService shareSeataService; @PutMapping("/audit/seata1/{id}") public Share auditByIdSeata1(@PathVariable Integer id, @RequestBody ShareAuditDTO auditDTO) { return this.shareSeataService.auditById(id, auditDTO); } 复制代码
Service代码:
- 审核中心服务代码(含调用用户中心代码接口)
@Slf4j @Service @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class ShareSeataService { private final ShareMapper shareMapper; private final UserCenterFeignClient userCenterFeignClient; @GlobalTransactional(rollbackFor = Exception.class) public Share auditById(Integer id, ShareAuditDTO auditDTO) { if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) { userCenterFeignClient.addBonus(id, 50); // 故意抛异常,如果⽤户中⼼侧也能回滚,说明实现了分布式事务 // throw new IllegalArgumentException("发⽣异常..."); } this.auditByIdInDB(id, auditDTO); return this.shareMapper.selectByPrimaryKey(id); } 复制代码
- 审核中心服务代码(执行操作更新数据库机制)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) { Share share = Share.builder().id(id).auditStatus(auditDTO.getAuditStatusEnum().toString()).reason(auditDTO.getReason()) .build(); this.shareMapper.updateByPrimaryKeySelective(share); } 复制代码
@GlobalTransactional 注解⽤来创建分布式事务。
被调⽤⽅代码
Maven依赖
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency> 复制代码
配置
seata: tx-service-group: user-center-seata-service-group service: vgroup-mapping: user-center-seata-service-group: seata-cluster grouplist: seata-cluster: 127.0.0.1:8091 disable-global-transaction: false 复制代码
可以看出来,差距主要体现在tx-service-group的值。
Controller代码:
@GetMapping("/add-bonus/{id}/{bonus}") public User addBonus(@PathVariable Integer id, @PathVariable Integer bonus) { this.userService.addBonus(id, bonus); return this.userService.findById(id); } 复制代码
Service代码:
@Slf4j @Service @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class UserService { private final UserMapper userMapper; private final BonusEventLogMapper bonusEventLogMapper; public User findById(Integer id) { // select * from user where id = #{id} return this.userMapper.selectByPrimaryKey(id); } public void addBonus(Integer userId, Integer bonus) { // 1. 为⽤户加积分 User user = this.userMapper.selectByPrimaryKey(userId); user.setBonus(user.getBonus() + bonus); this.userMapper.updateByPrimaryKeySelective(user); // 2. 记录⽇志到bonus_event_log表⾥⾯ this.bonusEventLogMapper.insert(BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE").createTime(new Date()).description("投稿加积分..").build()); log.info("积分添加完毕..."); } } 复制代码
Seata TCC模式
- ⼀阶段 prepare ⾏为
- ⼆阶段 commit 或 rollback ⾏为
需要实现的3个⽅法:
- ⼀阶段:
- ⽤于业务预处理的⽅法,即 Try 阶段、的⽅法,⽐如冻结⽤户的部分余额等等;
- ⼆阶段:
- ⽤于提交业务的⽅法,即 Commit ⽅法,⽐如扣除⽤户之前冻结的部分余额;
- ⽤于回滚业务的⽅法,即 Rollback ⽅法,⽐如返还之前冻结的⽤户余额;
官⽅⽂档
代码演示
行为操作接口
@LocalTCC public interface TccActionOne { @TwoPhaseBusinessAction(name = "TccActionOne", commitMethod = "commit", rollbackMethod = "rollback") boolean prepare(BusinessActionContext actionContext, int a); boolean commit(BusinessActionContext actionContext); boolean rollback(BusinessActionContext actionContext); } 复制代码
接口实现类
- 实现类1
@Component public class TccActionOneImpl implements TccActionOne { @Override public boolean prepare(BusinessActionContext actionContext, int a) { // 这⾥是本地玩的,也可以调⽤其他微服务的接⼝ String xid = actionContext.getXid(); System.out.println("TccActionOne prepare, xid:" + xid); return true; } @Override public boolean commit(BusinessActionContext actionContext) { // 这⾥是本地玩的,也可以调⽤其他微服务的接⼝ String xid = actionContext.getXid(); System.out.println("TccActionOne commit, xid:" + xid); ResultHolder.setActionOneResult(xid, "T"); return true; } @Override public boolean rollback(BusinessActionContext actionContext) { // 这⾥是本地玩的,也可以调⽤其他微服务的接⼝ String xid = actionContext.getXid(); System.out.println("TccActionOne rollback, xid:" + xid); ResultHolder.setActionOneResult(xid, "R"); return true; } } 复制代码
@LocalTCC public interface TccActionTwo { @TwoPhaseBusinessAction(name = "TccActionTwo", commitMethod = "commit", rollbackMethod = "rollback") boolean prepare(BusinessActionContext actionContext, int a); boolean commit(BusinessActionContext actionContext); boolean rollback(BusinessActionContext actionContext); } 复制代码
- 实现类2
@Component public class TccActionTwoImpl implements TccActionTwo { @Override public boolean prepare(BusinessActionContext actionContext, String b) { // 这⾥是本地玩的,也可以调⽤其他微服务的接⼝ String xid = actionContext.getXid(); System.out.println("TccActionTwo prepare, xid:" + xid); return true; } @Override public boolean commit(BusinessActionContext actionContext) { // 这⾥是本地玩的,也可以调⽤其他微服务的接⼝ String xid = actionContext.getXid(); System.out.println("TccActionTwo commit, xid:" + xid); ResultHolder.setActionTwoResult(xid, "T"); return true; } @Override public boolean rollback(BusinessActionContext actionContext) { // 这⾥是本地玩的,也可以调⽤其他微服务的接⼝ String xid = actionContext.getXid(); System.out.println("TccActionTwo rollback, xid:" + xid); ResultHolder.setActionTwoResult(xid, "R"); return true; } } 复制代码
- 聚合实现服务业务实现类执行
@Service public class ShareSeataService{ @Autowired TccActionOne tccActionOne; @Autowired TccActionTwo tccActionTwo; @GlobalTransactional public void tccTransactionCommit(Map<String, String> paramMap) { //第一个TCC 事务参与者 boolean result = tccActionOne.prepare(null, "one"); if (!result) { paramMap.put("xid",RootContext.getXID()); throw new RuntimeException("TccActionOne failed."); } List list = new ArrayList(); list.add("c1"); list.add("c2"); result = tccActionTwo.prepare(null, "two"); if (!result) { paramMap.put("xid",RootContext.getXID()); throw new RuntimeException("TccActionTwo failed."); } paramMap.put("xid",RootContext.getXID()); return ; } } // 回滚的代码相似,就不写了 复制代码
- 执行调用点
@Slf4j @RestController @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class ShareAdminController { private final ShareSeataService shareSeataService; @GetMapping("tcc-commit") public String tccTransactionCommit() { Map<String, String> map = new HashMap<>(); this.shareSeataService.tccTransactionCommit(map); String xid = map.get("xid"); // 结果T return ResultHolder.getActionOneResult(xid); } @GetMapping("/tcc-rollback") public String tccTransactionRollback() { Map<String, String> map = new HashMap<>(); try { this.shareSeataService.tccTransactionRollback(map); } catch (Throwable t) { log.warn("事务回滚..", t); } String xid = map.get("xid"); // 结果R return ResultHolder.getActionOneResult(xid); } } 复制代码
定义状态机⽂件:
{ "Name": "reduceInventoryAndBalance", "Comment": "reduce inventory then reduce balance in a transaction", "StartState": "ReduceInventory", "Version": "0.0.1", "States": { "ReduceInventory": { "Type": "ServiceTask", "ServiceName": "inventoryAction", "ServiceMethod": "reduce", "CompensateState": "CompensateReduceInventory", "Next": "ChoiceState", "Input": [ "$.[businessKey]", "$.[count]" ], "Output": { "reduceInventoryResult": "$.#root" }, "Status": { "#root == true": "SU", "#root == false": "FA", "$Exception{java.lang.Throwable}": "UN" } }, "ChoiceState": { "Type": "Choice", "Choices": [ { "Expression": "[reduceInventoryResult] == true", "Next": "ReduceBalance" } ], "Default": "Fail" }, "ReduceBalance": { "Type": "ServiceTask", "ServiceName": "balanceAction", "ServiceMethod": "reduce", "CompensateState": "CompensateReduceBalance", "Input": [ "$.[businessKey]", "$.[amount]", { "throwException": "$.[mockReduceBalanceFail]"分布式事务27 } ], "Output": { "compensateReduceBalanceResult": "$.#root" }, "Status": { "#root == true": "SU", "#root == false": "FA", "$Exception{java.lang.Throwable}": "UN" }, "Catch": [ { "Exceptions": [ "java.lang.Throwable" ], "Next": "CompensationTrigger" } ], "Next": "Succeed" }, "CompensateReduceInventory": { "Type": "ServiceTask", "ServiceName": "inventoryAction", "ServiceMethod": "compensateReduce", "Input": [ "$.[businessKey]" ] }, "CompensateReduceBalance": { "Type": "ServiceTask", "ServiceName": "balanceAction", "ServiceMethod": "compensateReduce", "Input": [ "$.[businessKey]" ] }, "CompensationTrigger": { "Type": "CompensationTrigger", "Next": "Fail" }, "Succeed": { "Type": "Succeed" }, "Fail": { "Type": "Fail", "ErrorCode": "PURCHASE_FAILED", "Message": "purchase failed" } } } 复制代码
测试代码:
public class LocalSagaTransactionStarter { public static void main(String[] args) { AbstractApplicationContext applicationContext = new ClassPathXmlApplication Context(new String[] {"spring/seata-saga.xml"}); StateMachineEngine stateMachineEngine = (StateMachineEngine) applicationCon text.getBean("stateMachineEngine"); transactionCommittedDemo(stateMachineEngine); transactionCompensatedDemo(stateMachineEngine); new ApplicationKeeper(applicationContext).keep(); } private static void transactionCommittedDemo(StateMachineEngine stateMachineEng ine) { Map<String, Object> startParams = new HashMap<>(3); String businessKey = String.valueOf(System.currentTimeMillis()); startParams.put("businessKey", businessKey); startParams.put("count", 10); startParams.put("amount", new BigDecimal("100")); //sync test StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduce InventoryAndBalance", null, businessKey, startParams); Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transactio n execute failed. XID: " + inst.getId()); System.out.println("saga transaction commit succeed. XID: " + inst.getId()) ; inst = stateMachineEngine.getStateMachineConfig().getStateLogStore().getSta teMachineInstanceByBusinessKey(businessKey, null); Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transactio n execute failed. XID: " + inst.getId()); //async test businessKey = String.valueOf(System.currentTimeMillis()); inst = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBala nce", null, businessKey, startParams, CALL_BACK); waittingForFinish(inst); Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transactio n execute failed. XID: " + inst.getId()); 分布式事务 29 System.out.println("saga transaction commit succeed. XID: " + inst.getId()) ; } private static void transactionCompensatedDemo(StateMachineEngine stateMachineE ngine) { Map<String, Object> startParams = new HashMap<>(4); String businessKey = String.valueOf(System.currentTimeMillis()); startParams.put("businessKey", businessKey); startParams.put("count", 10); startParams.put("amount", new BigDecimal("100")); startParams.put("mockReduceBalanceFail", "true"); //sync test StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduce InventoryAndBalance", null, businessKey, startParams); //async test businessKey = String.valueOf(System.currentTimeMillis()); inst = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBala nce", null, businessKey, startParams, CALL_BACK); waittingForFinish(inst); Assert.isTrue(ExecutionStatus.SU.equals(inst.getCompensationStatus()), "sag a transaction compensate failed. XID: " + inst.getId()); System.out.println("saga transaction compensate succeed. XID: " + inst.getI d()); } private static volatile Object lock = new Object(); private static AsyncCallback CALL_BACK = new AsyncCallback() { @Override public void onFinished(ProcessContext context, StateMachineInstance stateMach ineInstance) { synchronized (lock){ lock.notifyAll(); } } @Override public void onError(ProcessContext context, StateMachineInstance stateMachine Instance, Exception exp) { synchronized (lock){ lock.notifyAll(); } } }; private static void waittingForFinish(StateMachineInstance inst){ synchronized (lock){ if(ExecutionStatus.RU.equals(inst.getStatus())){ try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } 复制代码
Seata saga模式
Saga 事务:最终一致性
方案简介
Saga 事务核心思想是将长事务拆分为多个本地短事务,由 Saga 事务协调器协调,如果正常结束那就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。
处理流程
Saga事务基本协议
- 每个 Saga 事务由一系列幂等的有序子事务(sub-transaction) Ti 组成。
- 每个 Ti 都有对应的幂等补偿动作 Ci,补偿动作用于撤销 Ti 造成的结果。
- 可以看到,和 TCC 相比,Saga 没有“预留”动作,它的 Ti 就是直接提交到库。
代码实现
// 接⼝略 public class BalanceActionImpl implements BalanceAction { private static final Logger LOGGER = LoggerFactory.getLogger(BalanceActionImpl.class); @Override public boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params) { if(params != null && "true".equals(params.get("throwException"))){ throw new RuntimeException("reduce balance failed"); } LOGGER.info("reduce balance succeed, amount: " + amount + ", businessKey:" + businessKey); return true; } @Override public boolean compensateReduce(String businessKey, Map<String, Object> params) { if(params != null && "true".equals(params.get("throwException"))){ throw new RuntimeException("compensate reduce balance failed"); } LOGGER.info("compensate reduce balance succeed, businessKey:" + businessKey); return true; } } // 接⼝略 public class InventoryActionImpl implements InventoryAction { private static final Logger LOGGER = LoggerFactory.getLogger(InventoryActionImpl.class); @Override public boolean reduce(String businessKey, int count) { LOGGER.info("reduce inventory succeed, count: " + count + ", businessKey:" + businessKey); return true; } @Override public boolean compensateReduce(String businessKey) { LOGGER.info("compensate reduce inventory succeed, businessKey:" + businessKey); return true; } } 复制代码
Seata XA模式
官⽅⽂档
代码演示
- 添加Seata的依赖 & 配置
- 你平时的JDBC代码怎么写,依然怎么写。
4种模式对⽐与选择
AT
优势:
- 使⽤简单:对业务侵⼊性⼩
缺点:
- 性能中等
- 有全局锁
适⽤场景:
- 适⽤于对性能没有特别⾼的要求的场景
- 适⽤于不希望对业务进⾏改造的场景
TCC
优势
- 性能会⽐ AT 模式⾼很多
缺点
相对于 AT 模式,TCC 模式对业务代码有⼀定的侵⼊性
- 适⽤场景: 适⽤于核⼼系统等对性能有很⾼要求的场景
Saga
优势:
- ⼀阶段提交本地数据库事务,⽆锁,⾼性能;
- 参与者可以采⽤事务驱动异步执⾏,⾼吞吐;
- 补偿服务即正向服务的“反向”,易于理解,易于实现;
缺点:
- Saga 模式由于⼀阶段已经提交本地数据库事务,且没有进⾏“预留”动作,所以不能保证隔离性。后续会讲到对于缺乏隔离性的应对措施。
适⽤场景:
- 业务流程⻓/多
- 参与者包含其他公司或遗留系统服务,⽆法提供 TCC 模式要求的三个接⼝
- 典型业务系统:如⾦融⽹络(与外部⾦融机构对接)、互联⽹微贷、渠道整合、分布式架构服务
- 集成等业务系统
- 银⾏业⾦融机构使⽤⼴泛
XA
优势:
- ⽆侵⼊
缺点:
- 性能较差
- 需要数据库⽀持XA
适⽤场景:
- 强⼀致性的解决⽅案,适⽤于对⼀致性要求⾮常⾼的场景(使⽤较少)