Hmily实现TCC事务_转入转出微服务实现Confirm阶段
编写转出微服务Confirm阶段
/** * 确认阶段 * @param userAccountDTO */ public void sayConfrim(UserAccountDTO userAccountDTO) { String txNo = userAccountDTO.getTxNo(); log.info("********** 执行bank01 的 Confrim方法 ,事务id={}",txNo); // 1、幂等处理 ConfirmLog confirmLog = confirmLogMapper.selectById(txNo); if (confirmLog != null){ return ; } // 2、根据账户id查询账户 UserAccount userAccount = baseMapper.selectById(userAccountDTO.getSourceAccountNo()); userAccount.setTransferAmount(userAccount.getTransferAmount().subtract(userAccountDTO.getBigDecimal())); baseMapper.updateById(userAccount); // 3、 确认日志记录 ConfirmLog confirmLog1 = new ConfirmLog(); confirmLog1.setTxNo(userAccountDTO.getTxNo()); confirmLog1.setCreateTime(LocalDateTime.now()); confirmLogMapper.insert(confirmLog1); }
编写转入微服务Confirm阶段
/** * 确认阶段 * @param userAccountDTO */ public void sayConfrim(UserAccountDTO userAccountDTO) { String txNo = userAccountDTO.getTxNo(); log.info("********** 执行bank02 的Confrim方法 ,事务id={}",txNo); // 1、幂等处理 ConfirmLog confirmLog = confirmLogMapper.selectById(txNo); if (confirmLog != null) { return; } // 2、根据账户id查询账户 UserAccount userAccount = userAccountMapper.selectById(userAccountDTO.getTargetAccountNo()); userAccount.setAccountBalance(userAccount.getAccountBalance().add(userAccountDTO.getBigDecimal())); userAccount.setTransferAmount(userAccount.getTransferAmount().subtract(userAccountDTO.getBigDecimal())); userAccountMapper.updateById(userAccount); // 3、 确认日志记录 ConfirmLog confirmLog1 = new ConfirmLog(); confirmLog1.setTxNo(userAccountDTO.getTxNo()); confirmLog1.setCreateTime(LocalDateTime.now()); confirmLogMapper.insert(confirmLog1); }
Hmily实现TCC分布式事务_转入转出微服务实现Cancel阶段
转入微服务Cananl阶段
/** * 回滚 * @param userAccountDto */ @Transactional(rollbackFor = Exception.class) public void cancelMethod(UserAccountDto userAccountDto){ String txNo = userAccountDto.getTxNo(); log.info("执行bank02的cancel方法,事务id: {}, 参数为:{}",txNo,JSONObject.toJSONString(userAccountDto)); CancelLog cancelLog = iCancelLogService.findByTxNo(txNo); if(cancelLog != null){ log.info("bank02已经执行过Cancel方法,txNo:{}", txNo); return; } // 保存记录 iCancelLogService.saveCancelLog(txNo); userAccountMapper.cancelUserAccountBalanceBank02(userAccountDto.getAmount(), userAccountDto.getTargetAccountNo()); }
转出微服务Cancel阶段
/** * 取消阶段 * @param userAccountDTO */ public void sayCancel(UserAccountDTO userAccountDTO) { String txNo = userAccountDTO.getTxNo(); log.info("********** 执行bank01 的 Cancel方法 ,事务id={}",txNo); // 1. 幂等处理 CancelLog cancelLog = cancelLogMapper.selectById(txNo); if (cancelLog != null ){ return; } // 2、根据账户id查询账户 UserAccount userAccount = baseMapper.selectById(userAccountDTO.getSourceAccountNo()); userAccount.setAccountBalance(userAccount.getAccountBalance().add(userAccountDTO.getBigDecimal())); userAccount.setTransferAmount(userAccount.getTransferAmount().subtract(userAccountDTO.getBigDecimal())); baseMapper.updateById(userAccount); // 3、记录回滚日志 CancelLog cancelLog1 = new CancelLog(); cancelLog1.setTxNo(txNo); cancelLog1.setCreateTime(LocalDateTime.now()); cancelLogMapper.insert(cancelLog1); }
最终一致性分布式事务解决方案_什么是可靠消息最终一致性事务
可靠消息最终一致性的基本原理是事务发起方(消息发送者)执行 本地事务成功后发出一条消息,事务参与方(消息消费者)接收到 事务发起方发送过来的消息,并成功执行本地事务。事务发起方和事务参与方最终的数据能够达到一致的状态。
两种实现方式:
1、基于本地消息表
2、基于支持分布式事务的消息中间件,如RocketMQ等
基本原理
在使用可靠消息最终一致性方案解决分布式事务的问题时,需要确保消息发送和消息消费的一致性,从而确保消息的可靠性。
可靠消息最终一致性分布式事务实现_本地消息表
本地消息表模式的核心通过本地事务保证数据业务操作和消息的一 致性,然后通过定时任务发送给消费方或者中间加一层MQ的方 式,保障数据最终一致性。
库表设计
订单微服务中出库本地消息表:
基础功能
分析
Task微服务的任务
可靠消息最终一致性分布式事务实现_RocketMQ事务消息
RocketMQ是阿里巴巴开源的一款支持事务消息的消息中间件,于 2012年正式开源,2017年成为Apache基金会的顶级项目。
实现原理
RocketMQ 4.3版之后引入了完整的事务消息机制,其内部实现了完 整的本地消息表逻辑,使用RocketMQ实现可靠消息分布式事务就 不用用户再实现本地消息表的逻辑了,极大地减轻了开发工作量。
可靠消息最终一致性分布式事务实战_案列业务介绍
业务介绍
通过RocketMQ中间件实现可靠消息最终一致性分布式事务,模拟 商城业务中的下单扣减库存场景。订单微服务和库存微服务分别独立开发和部署。
流程
架构选型
数据库表设计
orders订单数据表
orders数据表存储于tx-msg-orders订单数据库。
DROP TABLE IF EXISTS `orders`; CREATE TABLE `order` ( `id` bigint(20) NOT NULL COMMENT '主键', `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间', `order_no` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '订单 编号', `product_id` bigint(20) NULL DEFAULT NULL COMMENT '商品id', `pay_count` int(11) NULL DEFAULT NULL COMMENT '购买数量', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1; CREATE TABLE `tx_log` ( `tx_no` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '分布式事务全局序列号', `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间', PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
stock库存数据表
DROP TABLE IF EXISTS `stock`; CREATE TABLE `stock` ( `id` bigint(20) NOT NULL COMMENT '主键id', `product_id` bigint(20) NULL DEFAULT NULL COMMENT '商品id', `total_count` int(11) NULL DEFAULT NULL COMMENT '商品总库存', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic; -- ---------------------------- -- Table structure for tx_log -- ---------------------------- DROP TABLE IF EXISTS `tx_log`; CREATE TABLE `tx_log` ( `tx_no` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '分布式事务全局序列号', `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间', PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1;
tx_log事务记录表
可靠消息最终一致性分布式事务实战_Docker安装 RocketMQ
在安装RocketMQ之前,我们先了解一下RocketMQ的部署架构,了 解一下RocketMQ的组件,然后基于当前主流的Docker安装 RocketMQ,我们这里安装单台RocketMQ,但为了防止单节点故 障、保障高可用,生产环境建议安装RocketMQ集群。
安装NameServer
拉取镜像
docker pull rocketmqinc/rocketmq
创建数据存储目录
mkdir -p /docker/rocketmq/data/namesrv/logs /docker/rocketmq/data/namesrv/store
启动NameServer
docker run -d \ --restart=always \ --name rmqnamesrv \ -p 9876:9876 \ -v /docker/rocketmq/data/namesrv/logs:/root/logs \ -v /docker/rocketmq/data/namesrv/store:/root/store \ -e "MAX_POSSIBLE_HEAP=100000000" \ rocketmqinc/rocketmq \ sh mqnamesrv
安装Broker
border配置:创建 broker.conf 配置文件
vim /docker/rocketmq/conf/broker.conf
# 所属集群名称,如果节点较多可以配置多个 brokerClusterName = DefaultCluster #broker名称,master和slave使用相同的名称,表明他们的 主从关系 brokerName = broker-a #0表示Master,大于0表示不同的 slave brokerId = 0 #表示几点做消息删除动作,默认是凌晨4点 deleteWhen = 04 #在磁盘上保留消息的时长,单位是小时 fileReservedTime = 48 #有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和 异步表示Master和Slave之间同步数据的机 制; brokerRole = ASYNC_MASTER #刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷 盘和异步刷盘;SYNC_FLUSH消息写入磁盘后 才返回成功状 态,ASYNC_FLUSH不需要; flushDiskType = ASYNC_FLUSH # 设置broker节点所在服务器的ip地址 brokerIP1 = 192.168.66.100 #剩余磁盘比例 diskMaxUsedSpaceRatio=99
启动broker
docker run -d --restart=always --name rmqbroker --link rmqnamesrv:namesrv -p 10911:10911 -p 10909:10909 --privileged=true -v /docker/rocketmq/data/broker/logs:/root/logs -v /docker/rocketmq/data/broker/store:/root/store -v /docker/rocketmq/conf/broker.conf:/opt/rocketmq -4.4.0/conf/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
报错:
部署RocketMQ的管理工具
RocketMQ提供了UI管理工具,名为rocketmq-console,我们选择 docker安装
#创建并启动容器 docker run -d --restart=always --name rmqadmin -e "JAVA_OPTS=- Drocketmq.namesrv.addr=192.168.66.100:9876 - Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 pangliang/rocketmq-console-ng
关闭防火墙(或者开放端口)
#关闭防火墙 systemctl stop firewalld.service #禁止开机启动 systemctl disable firewalld.service
测试
访问:http://192.168.66.101:8080/#/ (可以切换中文)
可靠消息最终一致性分布式事务实战_实现订单微服务
创建父工程rocketmq-msg
创建订单微服务子工程
引入依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starterweb</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connectorjava</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-bootstarter</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-bootstarter</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
编写配置文件
server: port: 9090 spring: application: name: tx-msg-stock datasource: url: jdbc:mysql://192.168.66.100:3306/txmsg-order? useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec t=true&failOverReadOnly=false&useSSL=false username: root password: 123456 driver-class-name: com.mysql.cj.jdbc.Driver ################ RocketMQ 配置 ########## rocketmq: name-server: 192.168.66.100:9876 producer: group: order-group
编写主启动类
/** * 订单微服务启动成功 */ @Slf4j @MapperScan("com.itbaizhan.order.mapper") @SpringBootApplication public class OrderMain9090 { public static void main(String[] args) { SpringApplication.run(OrderMain9090.class,args); log.info("************* 订单微服务启动成功*******"); } }
代码生成
package com.itbaizhan.utils; import com.baomidou.mybatisplus.generator.FastAutoGenerator; import com.baomidou.mybatisplus.generator.config.rules.NamingStrategy; import java.util.Arrays; import java.util.List; public class CodeGenerator { public static void main(String[] args) { FastAutoGenerator.create("jdbc:mysql://192.168.66.102:3306/tx-msg-order", "root", "123456") .globalConfig(builder -> { builder.author("itbaizhan")// 设置作者 .commentDate("MMdd") // 注释日期格式 .outputDir(System.getProperty("user.dir")+"/rocketmq-msg/orders"+ "/src/main/java/") .fileOverride(); //覆盖文件 }) // 包配置 .packageConfig(builder -> { builder.parent("com.itbaizhan.orders") // 包名前缀 .entity("entity")//实体类包名 .mapper("mapper")//mapper接口包名 .service("service"); //service包名 }) .strategyConfig(builder -> { // 设置需要生成的表名 builder.addInclude(Arrays.asList("orders","tx_log")) // 开始实体类配置 .entityBuilder() // 开启lombok模型 .enableLombok() //表名下划线转驼峰 .naming(NamingStrategy.underline_to_camel) //列名下划线转驼峰 .columnNaming(NamingStrategy.underline_to_camel); }) .execute(); } }
创建TxMessage类
在项目的com.itbaizhan.orders.tx包下创建TxMessage类,主要用 来封装实现分布式事务时,在订单微服务、RocketMQ消息中间件 和库存微服务之间传递的全局事务消息,项目中会通过事务消息实现幂等。
@Data @NoArgsConstructor @AllArgsConstructor public class TxMessage implements Serializable { private static final long serialVersionUID = -4704980150056885074L; /** * 商品id */ private Long productId; /** * 商品购买数量 */ private Integer payCount; /** * 全局事务编号 */ private String txNo; }
可靠消息最终一致性分布式事务实战_订单微服务业务层实现
业务逻辑层主要实现了用户提交订单后的业务逻辑。
编写OrderService接口
/** * 添加订单 * @param productId 商品id * @param payCount 购买数量 */ void save(Long productId,Integer payCount); /** * 提交订单同时保存事务信息 */ void submitOrderAndSaveTxNo(TxMessage txMessage); /** * 提交订单 * @param productId 商品id * @param payCount 购买数量 */ void submitOrder(Long productId, Integer payCount);
编写OrderService接口实现
package com.itbaizhan.order.service.impl; import com.alibaba.fastjson.JSONObject; import com.itbaizhan.order.entity.Order; import com.itbaizhan.order.entity.TxLog; import com.itbaizhan.order.mapper.OrderMapper; import com.itbaizhan.order.mapper.TxLogMapper; import com.itbaizhan.order.service.IOrderService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.itbaizhan.order.tx.TxMessage; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.time.LocalDateTime; import java.util.Date; import java.util.UUID; /** * <p> * 服务实现类 * </p> * * @author itbaizhan * @since 05-20 */ @Slf4j @Service public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService { @Resource RocketMQTemplate rocketMQTemplate; @Resource private TxLogMapper txLogMapper; /** * 添加 * @param productId 商品id * @param payCount 购买数量 */ @Override public void save(Long productId, Integer payCount) { Order order = new Order(); // 订单创建时间 order.setCreateTime(LocalDateTime.now()); // 生产订单编号 order.setOrderNo(UUID.randomUUID().toString().replace("-","")); // 商品id order.setProductId(productId); // 购买数量 order.setPayCount(payCount); baseMapper.insert(order); } @Override @Transactional(rollbackFor = Exception.class) public void submitOrderAndSaveTxNo(TxMessage txMessage) { TxLog txLog = txLogMapper.selectById(txMessage.getTxNo()); if(txLog != null){ log.info("订单微服务已经执行过事务,商品id为:{},事务编号为:{}",txMessage.getProductId(),txMessage.getTxNo()); return; } //生成订单 this.save(txMessage.getProductId(),txMessage.getPayCount()); //生成订单 txLog = new TxLog(); txLog.setTxNo(txMessage.getTxNo()); txLog.setCreateTime(LocalDateTime.now()); //添加事务日志 txLogMapper.insert(txLog); } /** * 提交订单 * @param productId 商品id * @param payCount 购买数量 */ @Override public void submitOrder(Long productId,Integer payCount) { //生成全局分布式序列号 String txNo = UUID.randomUUID().toString(); TxMessage txMessage = new TxMessage(productId, payCount, txNo); JSONObject jsonObject = new JSONObject(); jsonObject.put("txMessage", txMessage); Message<String> message = MessageBuilder.withPayload(jsonObject.toJSONString()).build(); //发送事务消息 且该消息不允许消费 tx_order_group: 指定版事务消息组 rocketMQTemplate.sendMessageInTransaction("tx_order_group", "topic_txmsg", message, null); } }
可靠消息最终一致性分布式事务实战_订单微服务监听事务消息
执行本地的业务代码
package com.itbaizhan.order.message; import com.alibaba.fastjson.JSONObject; import com.itbaizhan.order.entity.TxLog; import com.itbaizhan.order.mapper.TxLogMapper; import com.itbaizhan.order.service.IOrderService; import com.itbaizhan.order.service.ITxLogService; import com.itbaizhan.order.tx.TxMessage; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; /** * @author itbaizhan * @version 1.0.0 * @description 监听事务消息 */ @Slf4j @Component @RocketMQTransactionListener(txProducerGroup = "tx_order_group") public class OrderTxMessageListener implements RocketMQLocalTransactionListener { @Autowired private IOrderService orderService; @Resource private TxLogMapper txLogMapper; /** * RocketMQ的Producer本地事务:先执行本地的业务代码(使用Spring的事件管理),判断是否成功。 * 成功返回: RocketMQLocalTransactionState.COMMIT, * 失败返回:RocketMQLocalTransactionState.ROLLBACK */ @Override @Transactional(rollbackFor = Exception.class) public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object obj) { try { log.info("订单微服务执行本地事务"); TxMessage txMessage = this.getTxMessage(msg); //执行本地事务 orderService.submitOrderAndSaveTxNo(txMessage); //提交事务 log.info("订单微服务提交事务"); // COMMIT:即生产者通知Rocket该消息可以消费 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { e.printStackTrace(); //异常回滚事务 log.info("订单微服务回滚事务"); // ROLLBACK:即生产者通知Rocket将该消息删除 return RocketMQLocalTransactionState.ROLLBACK; } } private TxMessage getTxMessage(Message msg) { String messageString = new String((byte[]) msg.getPayload()); JSONObject jsonObject = JSONObject.parseObject(messageString); String txStr = jsonObject.getString("txMessage"); return JSONObject.parseObject(txStr,TxMessage.class); } }
网络异常消息处理
/** * 因为网络异常或其他原因时,RocketMQ的消息状态处于UNKNOWN时,调用该方法检查Producer的本地 * 事务是否已经执行成功, * 成功返回: RocketMQLocalTransactionState.COMMIT, * 失败返回:RocketMQLocalTransactionState.ROLLBACK */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { log.info("订单微服务查询本地事务"); TxMessage txMessage = this.getTxMessage(msg); // 获取订单的消息 Integer exists = txLogService.isExistsTx(txMessage.getTxNo()); if (exists != null) { // COMMIT:即生产者通知Rocket该消息可以消费 return RocketMQLocalTransactionState.COMMIT; } // UNKNOWN:即生产者通知Rocket继续查询该消息的状态 return RocketMQLocalTransactionState.UNKNOWN; } private TxMessage getTxMessage(Message msg) { String messageString = new String((byte[]) msg.getPayload()); JSONObject jsonObject = JSONObject.parseObject(messageString); String txStr = jsonObject.getString("txMessage"); return JSONObject.parseObject(txStr,TxMessage.class); }
可靠消息最终一致性分布式事务实战_实现库存微服务
创建库存微服务tx-msg-stock
引入依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starterweb</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connectorjava</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-bootstarter</artifactId> <version>2.0.1</version> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-bootstarter</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
编写配置文件
server: port: 6060 spring: application: name: tx-msg-stock datasource: url: jdbc:mysql://192.168.66.100:3306/txmsg-stock? useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec t=true&failOverReadOnly=false&useSSL=false username: root password01: 123456 driver-class-name: com.mysql.cj.jdbc.Driver ################ RocketMQ 配置 ########## rocketmq: name-server: 192.168.66.100:9876
编写主启动类
package com.itbaizhan.stock; import lombok.extern.slf4j.Slf4j; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @author itbaizhan * @version 1.0.0 * @description 库存微服务启动类 */ @MapperScan("com.itbaizhan.stock.mapper") @Slf4j @SpringBootApplication public class StockServerStarter { public static void main(String[] args) { SpringApplication.run(StockServerStarter.class, args); log.info("**************** 库存服务启动成功 ***********"); } }
代码生成
package com.itbaizhan.utils; import com.baomidou.mybatisplus.generator.FastAutoGenerator; import com.baomidou.mybatisplus.generator.config.rules.NamingStrategy; import java.util.Arrays; import java.util.List; public class CodeGenerator { public static void main(String[] args) { FastAutoGenerator.create("jdbc:mysql://192.168.66.102:3306/tx-msg-stock", "root", "123456") .globalConfig(builder -> { builder.author("itbaizhan")// 设置作者 .commentDate("MMdd") // 注释日期格式 .outputDir(System.getProperty("user.dir") +"/rocketmq-msg/stock"+ "/src/main/java/") .fileOverride(); //覆盖文件 }) // 包配置 .packageConfig(builder -> { builder.parent("com.itbaizhan.stock") // 包名前缀 .entity("entity")//实体类包名 .mapper("mapper")//mapper接口包名 .service("service"); //service包名 }) .strategyConfig(builder -> { // 设置需要生成的表名 builder.addInclude(Arrays.asList("stock","tx_log")) // 开始实体类配置 .entityBuilder() // 开启lombok模型 .enableLombok() //表名下划线转驼峰 .naming(NamingStrategy.underline_to_camel) //列名下划线转驼峰 .columnNaming(NamingStrategy.underline_to_camel); }) .execute(); } }
编写库存接口
public interface StockService { /** * 根据id查询库存 * @param id * @return */ Stock getStockById(Long id); /** * 扣减库存 */ void decreaseStock(TxMessage txMessage); }
可靠消息最终一致性分布式事务实战_库存微服务业务层实现
库存微服务的业务逻辑层主要监听RocketMQ发送过来的事务消 息,并在本地事务中执行扣减库存的操作。
编写库存接口
/** * 扣减库存 */ void decreaseStock(TxMessage txMessage);
库存接口实现类
package com.itbaizhan.stock.service.impl; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.itbaizhan.stock.entity.Stock; import com.itbaizhan.stock.entity.TxLog; import com.itbaizhan.stock.mapper.StockMapper; import com.itbaizhan.stock.mapper.TxLogMapper; import com.itbaizhan.stock.service.IStockService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.itbaizhan.stock.tx.TxMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.time.LocalDateTime; /** * <p> * 服务实现类 * </p> * * @author itbaizhan * @since 05-20 */ @Slf4j @Service public class StockServiceImpl extends ServiceImpl<StockMapper, Stock> implements IStockService { @Resource private StockMapper stockMapper; @Resource private TxLogMapper txLogMapper; @Transactional @Override public void decreaseStock(TxMessage txMessage) { log.info("库存微服务执行本地事务,商品id:{},购买数量:{}", txMessage.getProductId(), txMessage.getPayCount()); //检查是否执行过事务 TxLog txLog = txLogMapper.selectById(txMessage.getTxNo()); if(txLog != null){ log.info("库存微服务已经执行过事务,事务编号为:{}", txMessage.getTxNo()); } // 根据商品id查询库存 QueryWrapper<Stock> queryWrapper = new QueryWrapper<>(); queryWrapper.eq("product_id",txMessage.getProductId()); Stock stock = stockMapper.selectOne(queryWrapper); if(stock.getTotalCount() < txMessage.getPayCount()){ throw new RuntimeException("库存不足"); } // 减库存 stock.setTotalCount(stock.getTotalCount()- txMessage.getPayCount()); stockMapper.updateById(stock); //生成订单 txLog = new TxLog(); txLog.setTxNo(txMessage.getTxNo()); txLog.setCreateTime(LocalDateTime.now()); //添加事务日志 txLogMapper.insert(txLog); } }
库存微服务消费者实现
用于消费RocketMQ发送过来的事务消息,并且调用StockService中的decreaseStock(TxMessage)方法扣减库存。
库存事务消费者
package com.itbaizhan.stock.message; import com.alibaba.fastjson.JSONObject; import com.itbaizhan.stock.service.IStockService; import com.itbaizhan.stock.tx.TxMessage; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author binghe * @version 1.0.0 * @description 库存事务消费者 */ @Component @Slf4j @RocketMQMessageListener(consumerGroup = "tx_stock_group", topic = "topic_txmsg") public class StockTxMessageConsumer implements RocketMQListener<String> { @Autowired private IStockService stockService; @Override public void onMessage(String message) { log.info("库存微服务开始消费事务消息:{}", message); TxMessage txMessage = this.getTxMessage(message); stockService.decreaseStock(txMessage); } private TxMessage getTxMessage(String msg){ JSONObject jsonObject = JSONObject.parseObject(msg); String txStr = jsonObject.getString("txMessage"); return JSONObject.parseObject(txStr,TxMessage.class); } }
可靠消息最终一致性分布式事务实战_测试程序
查询数据
正式测试之前,先来查询下tx-msg-orders数据库和tx-msg-stock数 据库各个数据表中的数据。
分别启动库存和订单微服务
编写控制层接口
@Autowired private IOrderService iOrderService; /** * 创建订单 * @param productId 商品id * @param payCount 购买数量 * @return */ @GetMapping(value = "/submit_order") public String transfer(@RequestParam("productId")Long productId, @RequestParam("payCount") Integer payCount){ iOrderService.submitOrder(productId, payCount); return "下单成功"; }
分别启动库存微服务stock和订单微服务orders,并在浏览器中访问 http://localhost:9090/order/submit_order?productId=1001&pay Count=1
最终一致性分布式事务解决方案_什么是最大努力通知型分布式事务
最大努力通知型( Best-effort delivery)是最简单的一种柔性事务。
适用场景
最大努力通知型解决方案适用于最终一致性时间敏感度低的场景。 最典型的使用场景就是支付成功后,支付平台异步通知商户支付结 果。并且事务被动方的处理结果不会影响主动方的处理结果。 典型的使用场景:如银行通知、商户通知等。
流程图
最大努力通知型分布式事务_最大努力通知与可靠消息最终一致性的区别
最大努力通知型分布式事务解决方案
流程:
1、发起通知方将通知发给MQ。 使用普通消息机制将通知发给MQ。
2、接收通知方监听 MQ。
3、接收通知方接收消息,业务处理完成回应ack。
4、接收通知方若没有回应ack则MQ会重复通知。 MQ会按照间隔1min、5min、10min、 30min、1h、2h、5h、10h的方式,逐步拉大通知间隔(如果MQ采用 rocketMq,在 broker中可进行配置),直到达到通知要求的时间窗口上限。
5、接收通知方可通过消息校对接口来校对消息的一致性。
最大努力通知型分布式事务_案例业务说明
设计完数据库后,创建tx-notifymsg-account库
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for account_info -- ---------------------------- DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` ( `id` int(11) NOT NULL COMMENT '主键id', `account_no` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '账户', `account_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账户名', `account_balance` decimal(10, 2) NULL DEFAULT NULL COMMENT '账户余额', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = DYNAMIC; -- ---------------------------- -- Records of account_info -- ---------------------------- -- ---------------------------- -- Table structure for pay_info -- ---------------------------- DROP TABLE IF EXISTS `pay_info`; CREATE TABLE `pay_info` ( `tx_no` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '充值记录流水号', `account_no` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账 户', `pay_amount` decimal(10, 2) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值金额', `pay_result` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值 结果', `pay_time` datetime(0) NOT NULL COMMENT '充值 时间', PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = DYNAMIC; -- ---------------------------- -- Records of pay_info -- ---------------------------- SET FOREIGN_KEY_CHECKS = 1;
设计完数据库后,创建tx-notifymsg-payment库
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for pay_info -- ---------------------------- DROP TABLE IF EXISTS `pay_info`; CREATE TABLE `pay_info` ( `tx_no` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '充值记录流水 号', `account_no` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账 户', `pay_amount` decimal(10, 2) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值金额', `pay_result` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值 结果', `pay_time` datetime(0) NOT NULL COMMENT '充值 时间', PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = DYNAMIC; -- ---------------------------- -- Records of pay_info -- ---------------------------- SET FOREIGN_KEY_CHECKS = 1;
最大努力通知型分布式事务实战_实现充值微服务
主要实现功能
1、充值接口
2、查询充值结果接口
创建父项目rocketmq-notifymsg
创建子工程
引入依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starterweb</artifactId> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-bootstarter</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connectorjava</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-bootstarter</artifactId> <version>2.0.1</version> </dependency> <!-- 引入nacos依赖 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starteralibaba-nacos-discovery</artifactId> </dependency> </dependencies>
编写主启动类
@EnableDiscoveryClient @MapperScan("com.itbaizhan.payment.mapper") @SpringBootApplication @Slf4j public class PayMain7071 { public static void main(String[] args) { SpringApplication.run(PayMain7071.class,args); log.info("*********** 充值服务启动成功*********"); } }
编写配置文件
server: port: 7071 spring: cloud: nacos: discovery: server-addr: 192.168.66.100:8848 application: name: tx-notifymsg-pay datasource: url: jdbc:mysql://192.168.66.100:3306/txnotifymsg-payment? useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec t=true&failOverReadOnly=false&useSSL=false username: root password01: 123456 driver-class-name: com.mysql.cj.jdbc.Driver ################ RocketMQ 配置 ########## rocketmq: name-server: 192.168.66.100:9876 producer: group: payment-group
最大努力通知型分布式事务_充值微服务之业务层实现
充值微服务的业务逻辑层主要完成充值的业务逻辑处理,当充值成功时,会向RocketMQ发送充值结果信息,同时提供业务逻辑层查询充值结果信息的接口。
编写充值接口
public interface IPayInfoService extends IService<PayInfo> { /** * 保存充值信息 */ PayInfo savePayInfo(PayInfo payInfo); /** * 查询指定的充值信息 */ PayInfo getPayInfoByTxNo(String txNo); }
充值接口实现
@Slf4j @Service public class PayInfoServiceImpl extends ServiceImpl<PayInfoMapper, PayInfo> implements IPayInfoService { @Resource private PayInfoMapper payInfoMapper; @Resource private RocketMQTemplate rocketMQTemplate; @Override public PayInfo savePayInfo(PayInfo payInfo) { payInfo.setTxNo(UUID.randomUUID().toString().replace("-","")); payInfo.setPayResult("success"); payInfo.setPayTime(LocalDateTime.now()); int count = payInfoMapper.insert(payInfo); //充值信息保存成功 if(count > 0){ log.info("充值微服务向账户微服务发送结果消息"); //发送消息通知账户微服务 rocketMQTemplate.convertAndSend("topic_nofitymsg",JSON.toJSONString(payInfo)); return payInfo; } return null; } @Override public PayInfo getPayInfoByTxNo(String txNo) { return baseMapper.selectById(txNo); } }
编写充值接口
@RestController @RequestMapping("/payInfo") public class PayInfoController { @Autowired private IPayInfoService payInfoService; /** * 充值 * @param payInfo * @return */ @GetMapping(value = "/pay_account") public PayInfo pay(PayInfo payInfo){ //生成事务编号 return payInfoService.savePayInfo(payInfo); } /** * 查询充值结果 * @param txNo * @return */ @GetMapping(value = "/query/payresult/{txNo}") public PayInfo payResult(@PathVariable("txNo") String txNo){ return payInfoService.getPayInfoByTxNo(txNo); } }
最大努力通知型分布式事务_实现账户微服务
创建子工程account
引入依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starterweb</artifactId> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-bootstarter</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connectorjava</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-bootstarter</artifactId> <version>2.0.1</version> </dependency> <!-- 引入Nacos注册中心 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starteralibaba-nacos-discovery</artifactId> </dependency> <!-- 引入OpenFeign --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starteropenfeign</artifactId> </dependency> <!-- 引入负载均衡器--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloudloadbalancer</artifactId> </dependency> </dependencies>
编写配置文件
server: port: 7070 spring: cloud: nacos: discovery: server-addr: 192.168.66.100:8848 application: name: tx-notifymsg-account datasource: url: jdbc:mysql://192.168.66.100:3306/txnotifymsg-account? useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec t=true&failOverReadOnly=false&useSSL=false username: root password01: 123456 driver-class-name: com.mysql.jdbc.Driver ################ RocketMQ 配置 ########## rocketmq: name-server: 192.168.66.100:9876
最大努力通知型分布式事务_账户微服务之业务层实现
RocketMQ消费充值信息
@Slf4j @Component @RocketMQMessageListener(consumerGroup = "consumer_group_account", topic = "topic_nofitymsg") public class NotifyMsgAccountListener implements RocketMQListener<String> { @Autowired private IAccountInfoService accountInfoService; @Override public void onMessage(String message) { log.info("账户微服务收到RocketMQ的消息: {}", JSONObject.toJSONString(message)); //如果是充值成功,则修改账户余额 PayInfo payInfo = JSON.parseObject(message, PayInfo.class); if("success".equals(payInfo.getPayResult())){ accountInfoService.updateAccountBalance(payInfo); } log.info("更新账户余额完毕:{}", JSONObject.toJSONString(payInfo)); } }
编写账户操作接口
/** * 更新账户余额 */ void updateAccountBalance(PayInfo payInfo);
实现账户操作接口
@Slf4j @Service public class AccountInfoServiceImpl extends ServiceImpl<AccountInfoMapper, AccountInfo> implements IAccountInfoService { @Resource private AccountInfoMapper accountInfoMapper; @Resource private PayInfoMapper payInfoMapper; /** * * @param payInfo */ @Transactional(rollbackFor = Exception.class) @Override public void updateAccountBalance(PayInfo payInfo) { if(payInfoMapper.selectById(payInfo.getTxNo()) != null){ log.info("账户微服务已经处理过当前事务..."); return; } LambdaUpdateWrapper<AccountInfo> lambdaUpdateWrapper = new LambdaUpdateWrapper<>(); lambdaUpdateWrapper.eq(AccountInfo::getAccountNo,payInfo.getAccountNo()); //更新账户余额 List<AccountInfo> accountInfos = baseMapper.selectList(lambdaUpdateWrapper); if (accountInfos != null && !accountInfos.isEmpty()){ AccountInfo accountInfo = accountInfos.get(0); accountInfo.setAccountBalance(accountInfo.getAccountBalance().add(payInfo.getPayAmount())); accountInfoMapper.updateById(accountInfo); } //保存充值记录 payInfoMapper.insert(payInfo); } }
最大努力通知型分布式事务_账户微服务远程调用实现
主启动类加Feign注解
@EnableDiscoveryClient @EnableFeignClients @MapperScan("com.itbaizhan.account.mapper") @SpringBootApplication @Slf4j public class AccountMain7070 { public static void main(String[] args) { SpringApplication.run(AccountMain7070.class,args); log.info("*********** AccountMain7070启动成功 *********"); } }
编写远程调用接口
@Service @FeignClient("tx-notifymsg-pay") public interface IPayFeignService { @GetMapping(value = "/payInfo/query/payresult/{txNo}") PayInfo payResult(@PathVariable("txNo") String txNo); }
编写查询账户接口
/** * 查询充值结果 */ PayInfo queryPayResult(String txNo);
实现查询账户信息
/** * 查询结果 * @param txNo * @return */ @Override public PayInfo queryPayResult(String txNo) { try{ return iPayFeignService.payResult(txNo); }catch (Exception e){ log.error("查询充值结果异常:{}", e); } return null; }
编写查询充值结果接口
/** * 主动查询充值结果 * @param txNo * @return */ @GetMapping(value = "/query/payresult/{txNo}") public ResponseEntity result(@PathVariable("txNo") String txNo){ return ResponseEntity.ok(accountInfoService.queryPayResult(txNo)); }
最大努力通知型分布式事务_测试程序
查看account库和payment库数据
启动账户和充值微服务
调用充值微服务的接口http://localhost:7071/payInfo/pay_accoun t为账户编号为1001的账户充值1000元。
账户微服务的日志文件中输出如下信息
可以看到,充值微服务将充值结果信息成功发送到了RocketMQ, 并且账户微服务成功订阅了RocketMQ的消息并执行了本地事务。
查询充值结果