分布式事物【库存微服务业务层实现、实现充值微服务、充值微服务之业务层实现、账户微服务之业务层实现】(九)-全面详解(学习总结---从入门到深化)(上)

简介: 分布式事物【库存微服务业务层实现、实现充值微服务、充值微服务之业务层实现、账户微服务之业务层实现】(九)-全面详解(学习总结---从入门到深化)

可靠消息最终一致性分布式事务实战_库存微服务业务层实现



库存微服务的业务逻辑层主要监听RocketMQ发送过来的事务消 息,并在本地事务中执行扣减库存的操作。


编写库存接口

    /**
     * 扣减库存
     */
    void decreaseStock(TxMessage txMessage);


库存接口实现类

package com.tong.stock.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.tong.stock.entity.Stock;
import com.tong.stock.entity.TxLog;
import com.tong.stock.mapper.StockMapper;
import com.tong.stock.mapper.TxLogMapper;
import com.tong.stock.service.IStockService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.tong.stock.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
/**
* <p>
* 服务实现类
* </p>
*
* @author tong
* @since 05-20
*/
@Slf4j
@Service
public class StockServiceImpl extends ServiceImpl<StockMapper, Stock> implements
IStockService {
    @Resource
    private StockMapper stockMapper;
    @Resource
    private TxLogMapper txLogMapper;
    @Transactional
    @Override
    public void decreaseStock(TxMessage txMessage) {
        log.info("库存微服务执行本地事务,商品id:{},购买数量:{}", txMessage.getProductId(),
txMessage.getPayCount());
        //检查是否执行过事务
        TxLog txLog = txLogMapper.selectById(txMessage.getTxNo());
        if(txLog != null){
            log.info("库存微服务已经执行过事务,事务编号为:{}", txMessage.getTxNo());
       }
        // 根据商品id查询库存
        QueryWrapper<Stock> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("product_id",txMessage.getProductId());
        Stock stock = stockMapper.selectOne(queryWrapper);
        if(stock.getTotalCount() < txMessage.getPayCount()){
            throw  new RuntimeException("库存不足");
       }
        // 减库存
       stock.setTotalCount(stock.getTotalCount()- txMessage.getPayCount());
        stockMapper.updateById(stock);
        //生成订单
        txLog = new TxLog();
        txLog.setTxNo(txMessage.getTxNo());
        txLog.setCreateTime(LocalDateTime.now());
        //添加事务日志
        txLogMapper.insert(txLog);
   }
}


库存微服务消费者实现


用于消费RocketMQ发送过来的事务消息,并且调用StockService中的decreaseStock(TxMessage)方法扣减库存。


库存事务消费者

package com.tong.stock.message;
import com.alibaba.fastjson.JSONObject;
import com.tong.stock.service.IStockService;
import com.tong.stock.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author binghe
* @version 1.0.0
* @description 库存事务消费者
*/
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "tx_stock_group", topic = "topic_txmsg")
public class StockTxMessageConsumer implements
RocketMQListener<String> {
    @Autowired
    private IStockService stockService;
    @Override
    public void onMessage(String message) {
        log.info("库存微服务开始消费事务消息:{}", message);
        TxMessage txMessage = this.getTxMessage(message);
        stockService.decreaseStock(txMessage);
   }
    private TxMessage getTxMessage(String msg){
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String txStr = jsonObject.getString("txMessage");
        return JSONObject.parseObject(txStr,TxMessage.class);
   }
}


可靠消息最终一致性分布式事务实战_测试程序


查询数据


正式测试之前,先来查询下tx-msg-orders数据库和tx-msg-stock数 据库各个数据表中的数据。


分别启动库存和订单微服务


编写控制层接口

@Autowired
    private IOrderService iOrderService;
    /**
     * 创建订单
     * @param productId 商品id
     * @param payCount 购买数量
     * @return
     */
    @GetMapping(value = "/submit_order")
    public String transfer(@RequestParam("productId")Long productId, @RequestParam("payCount") Integer payCount){ 
        iOrderService.submitOrder(productId, payCount);
        return "下单成功";
   }


分别启动库存微服务stock和订单微服务orders,并在浏览器中访问 http://localhost:9090/order/submit_order?productId=1001&pay Count=1

最终一致性分布式事务解决方案_什么是最大努力通知型分布式事务



最大努力通知型( Best-effort delivery)是最简单的一种柔性事务。


适用场景


最大努力通知型解决方案适用于最终一致性时间敏感度低的场景。 最典型的使用场景就是支付成功后,支付平台异步通知商户支付结 果。并且事务被动方的处理结果不会影响主动方的处理结果。 典型的使用场景:如银行通知、商户通知等。


流程图

最大努力通知型分布式事务_最大努力通知与可靠消息最终一致性的区别


最大努力通知型分布式事务解决方案


流程:


1、发起通知方将通知发给MQ。 使用普通消息机制将通知发给MQ。

2、接收通知方监听 MQ。

3、接收通知方接收消息,业务处理完成回应ack。

4、接收通知方若没有回应ack则MQ会重复通知。 MQ会按照间隔1min、5min、10min、 30min、1h、2h、5h、10h的方式,逐步拉大通知间隔(如果MQ采用 rocketMq,在 broker中可进行配置),直到达到通知要求的时间窗口上限。

5、接收通知方可通过消息校对接口来校对消息的一致性。

最大努力通知型分布式事务_案例业务说明



设计完数据库后,创建tx-notifymsg-account库

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for account_info
-- ----------------------------
DROP TABLE IF EXISTS `account_info`;
CREATE TABLE `account_info`  (
  `id` int(11) NOT NULL COMMENT '主键id',
  `account_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '账户',
`account_name` varchar(255) CHARACTER SET
utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'账户名',
  `account_balance` decimal(10, 2) NULL DEFAULT
NULL COMMENT '账户余额',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of account_info
-- ----------------------------
-- ----------------------------
-- Table structure for pay_info
-- ----------------------------
DROP TABLE IF EXISTS `pay_info`;
CREATE TABLE `pay_info`  (
  `tx_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '充值记录流水号',
  `account_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账
户',
  `pay_amount` decimal(10, 2) CHARACTER SET
utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'充值金额',
  `pay_result` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值
结果',
  `pay_time` datetime(0) NOT NULL COMMENT '充值
时间',
PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE
= utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of pay_info
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;


设计完数据库后,创建tx-notifymsg-payment库

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for pay_info
-- ----------------------------
DROP TABLE IF EXISTS `pay_info`;
CREATE TABLE `pay_info`  (
  `tx_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '充值记录流水
号',
  `account_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账
户',
  `pay_amount` decimal(10, 2) CHARACTER SET
utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'充值金额',
`pay_result` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值
结果',
  `pay_time` datetime(0) NOT NULL COMMENT '充值
时间',
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE
= utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of pay_info
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;


分布式事物【库存微服务业务层实现、实现充值微服务、充值微服务之业务层实现、账户微服务之业务层实现】(九)-全面详解(学习总结---从入门到深化)(下):https://developer.aliyun.com/article/1419999

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
领域建模 数据库 数据安全/隐私保护
DailyMart03:如何基于DDD设计商城的领域模型?
DailyMart03:如何基于DDD设计商城的领域模型?
1082 0
|
9月前
|
消息中间件 数据库 RocketMQ
分布式事物【库存微服务业务层实现、实现充值微服务、充值微服务之业务层实现、账户微服务之业务层实现】(九)-全面详解(学习总结---从入门到深化)
分布式事物【库存微服务业务层实现、实现充值微服务、充值微服务之业务层实现、账户微服务之业务层实现】(九)-全面详解(学习总结---从入门到深化)
188 0
|
9月前
|
消息中间件 Java 数据库
秒杀系统库存超卖问题:从传统解决方案到引入RabbitMQ
秒杀系统库存超卖问题:从传统解决方案到引入RabbitMQ
528 0
|
9月前
|
消息中间件 RocketMQ 微服务
分布式事物【库存微服务业务层实现、实现充值微服务、充值微服务之业务层实现、账户微服务之业务层实现】(九)-全面详解(学习总结---从入门到深化)(下)
分布式事物【库存微服务业务层实现、实现充值微服务、充值微服务之业务层实现、账户微服务之业务层实现】(九)-全面详解(学习总结---从入门到深化)
80 0
|
9月前
|
缓存 NoSQL Java
大型生鲜系统库存负数问题解决方法:技术选型与实际应用
大型生鲜系统库存负数问题解决方法:技术选型与实际应用
73 0
|
消息中间件
分布式事务解决方案之一:MQ异步确保事务
分布式事务解决方案之一:MQ异步确保事务
|
网络协议 安全
内网接入外网的几种方式
内网接入外网的几种方式
880 0
|
域名解析 缓存 运维
简谈 CDN “调度异常” 导致的系列问题
## 背景 为什么要谈这个话题?缘由于现在 CDN 的广泛应用在企业客户,但是很多甲方的运维或者工程师对 CDN 的调度以及 DNS 的调度原理并不清楚,基本遇到问题也无法判断到底是否问题在 CDN 。甚至用户自己的使用规范不对也会埋怨是 CDN 的问题。今天简单根据几个案例聊下所谓 “调度不准” 的一系列疑问。 ## 购买 CDN 加速服务后的解析流程 先了解几个概念,我们在通过一张
1710 0
简谈 CDN “调度异常” 导致的系列问题
|
JavaScript
Vue侦听器Watch
Watch是Vue.js提供的一个观察者模式,用于监听数据的变化并执行相应的回调函数。虽然计算属性Computed在大多数情况下更合适,但有时也需要一个自定义的侦听器Watch。因为在有些情况下,我们需要在状态变化时执行一些“副作用”:例如更改 DOM,或是根据异步操作的结果去修改另一处的状态。
【Hexo】butterfly主题添加备案信息
【Hexo】butterfly主题添加备案信息
【Hexo】butterfly主题添加备案信息

热门文章

最新文章