可靠消息最终一致性分布式事务实战_库存微服务业务层实现
库存微服务的业务逻辑层主要监听RocketMQ发送过来的事务消 息,并在本地事务中执行扣减库存的操作。
编写库存接口
/** * 扣减库存 */ void decreaseStock(TxMessage txMessage);
库存接口实现类
package com.tong.stock.service.impl; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.tong.stock.entity.Stock; import com.tong.stock.entity.TxLog; import com.tong.stock.mapper.StockMapper; import com.tong.stock.mapper.TxLogMapper; import com.tong.stock.service.IStockService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.tong.stock.tx.TxMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.time.LocalDateTime; /** * <p> * 服务实现类 * </p> * * @author tong * @since 05-20 */ @Slf4j @Service public class StockServiceImpl extends ServiceImpl<StockMapper, Stock> implements IStockService { @Resource private StockMapper stockMapper; @Resource private TxLogMapper txLogMapper; @Transactional @Override public void decreaseStock(TxMessage txMessage) { log.info("库存微服务执行本地事务,商品id:{},购买数量:{}", txMessage.getProductId(), txMessage.getPayCount()); //检查是否执行过事务 TxLog txLog = txLogMapper.selectById(txMessage.getTxNo()); if(txLog != null){ log.info("库存微服务已经执行过事务,事务编号为:{}", txMessage.getTxNo()); } // 根据商品id查询库存 QueryWrapper<Stock> queryWrapper = new QueryWrapper<>(); queryWrapper.eq("product_id",txMessage.getProductId()); Stock stock = stockMapper.selectOne(queryWrapper); if(stock.getTotalCount() < txMessage.getPayCount()){ throw new RuntimeException("库存不足"); } // 减库存 stock.setTotalCount(stock.getTotalCount()- txMessage.getPayCount()); stockMapper.updateById(stock); //生成订单 txLog = new TxLog(); txLog.setTxNo(txMessage.getTxNo()); txLog.setCreateTime(LocalDateTime.now()); //添加事务日志 txLogMapper.insert(txLog); } }
库存微服务消费者实现
用于消费RocketMQ发送过来的事务消息,并且调用StockService中的decreaseStock(TxMessage)方法扣减库存。
库存事务消费者
package com.tong.stock.message; import com.alibaba.fastjson.JSONObject; import com.tong.stock.service.IStockService; import com.tong.stock.tx.TxMessage; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author binghe * @version 1.0.0 * @description 库存事务消费者 */ @Component @Slf4j @RocketMQMessageListener(consumerGroup = "tx_stock_group", topic = "topic_txmsg") public class StockTxMessageConsumer implements RocketMQListener<String> { @Autowired private IStockService stockService; @Override public void onMessage(String message) { log.info("库存微服务开始消费事务消息:{}", message); TxMessage txMessage = this.getTxMessage(message); stockService.decreaseStock(txMessage); } private TxMessage getTxMessage(String msg){ JSONObject jsonObject = JSONObject.parseObject(msg); String txStr = jsonObject.getString("txMessage"); return JSONObject.parseObject(txStr,TxMessage.class); } }
可靠消息最终一致性分布式事务实战_测试程序
查询数据
正式测试之前,先来查询下tx-msg-orders数据库和tx-msg-stock数 据库各个数据表中的数据。
分别启动库存和订单微服务
编写控制层接口
@Autowired private IOrderService iOrderService; /** * 创建订单 * @param productId 商品id * @param payCount 购买数量 * @return */ @GetMapping(value = "/submit_order") public String transfer(@RequestParam("productId")Long productId, @RequestParam("payCount") Integer payCount){ iOrderService.submitOrder(productId, payCount); return "下单成功"; }
分别启动库存微服务stock和订单微服务orders,并在浏览器中访问 http://localhost:9090/order/submit_order?productId=1001&pay Count=1
最终一致性分布式事务解决方案_什么是最大努力通知型分布式事务
最大努力通知型( Best-effort delivery)是最简单的一种柔性事务。
适用场景
最大努力通知型解决方案适用于最终一致性时间敏感度低的场景。 最典型的使用场景就是支付成功后,支付平台异步通知商户支付结 果。并且事务被动方的处理结果不会影响主动方的处理结果。 典型的使用场景:如银行通知、商户通知等。
流程图
最大努力通知型分布式事务_最大努力通知与可靠消息最终一致性的区别
最大努力通知型分布式事务解决方案
流程:
1、发起通知方将通知发给MQ。 使用普通消息机制将通知发给MQ。
2、接收通知方监听 MQ。
3、接收通知方接收消息,业务处理完成回应ack。
4、接收通知方若没有回应ack则MQ会重复通知。 MQ会按照间隔1min、5min、10min、 30min、1h、2h、5h、10h的方式,逐步拉大通知间隔(如果MQ采用 rocketMq,在 broker中可进行配置),直到达到通知要求的时间窗口上限。
5、接收通知方可通过消息校对接口来校对消息的一致性。
最大努力通知型分布式事务_案例业务说明
设计完数据库后,创建tx-notifymsg-account库
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for account_info -- ---------------------------- DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` ( `id` int(11) NOT NULL COMMENT '主键id', `account_no` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '账户', `account_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账户名', `account_balance` decimal(10, 2) NULL DEFAULT NULL COMMENT '账户余额', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = DYNAMIC; -- ---------------------------- -- Records of account_info -- ---------------------------- -- ---------------------------- -- Table structure for pay_info -- ---------------------------- DROP TABLE IF EXISTS `pay_info`; CREATE TABLE `pay_info` ( `tx_no` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '充值记录流水号', `account_no` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账 户', `pay_amount` decimal(10, 2) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值金额', `pay_result` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值 结果', `pay_time` datetime(0) NOT NULL COMMENT '充值 时间', PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = DYNAMIC; -- ---------------------------- -- Records of pay_info -- ---------------------------- SET FOREIGN_KEY_CHECKS = 1;
设计完数据库后,创建tx-notifymsg-payment库
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for pay_info -- ---------------------------- DROP TABLE IF EXISTS `pay_info`; CREATE TABLE `pay_info` ( `tx_no` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '充值记录流水 号', `account_no` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账 户', `pay_amount` decimal(10, 2) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值金额', `pay_result` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值 结果', `pay_time` datetime(0) NOT NULL COMMENT '充值 时间', PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = DYNAMIC; -- ---------------------------- -- Records of pay_info -- ---------------------------- SET FOREIGN_KEY_CHECKS = 1;
分布式事物【库存微服务业务层实现、实现充值微服务、充值微服务之业务层实现、账户微服务之业务层实现】(九)-全面详解(学习总结---从入门到深化)(下):https://developer.aliyun.com/article/1419999