分布式事物【库存微服务业务层实现、实现充值微服务、充值微服务之业务层实现、账户微服务之业务层实现】(九)-全面详解(学习总结---从入门到深化)(上)

简介: 分布式事物【库存微服务业务层实现、实现充值微服务、充值微服务之业务层实现、账户微服务之业务层实现】(九)-全面详解(学习总结---从入门到深化)

可靠消息最终一致性分布式事务实战_库存微服务业务层实现



库存微服务的业务逻辑层主要监听RocketMQ发送过来的事务消 息,并在本地事务中执行扣减库存的操作。


编写库存接口

    /**
     * 扣减库存
     */
    void decreaseStock(TxMessage txMessage);


库存接口实现类

package com.tong.stock.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.tong.stock.entity.Stock;
import com.tong.stock.entity.TxLog;
import com.tong.stock.mapper.StockMapper;
import com.tong.stock.mapper.TxLogMapper;
import com.tong.stock.service.IStockService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.tong.stock.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
/**
* <p>
* 服务实现类
* </p>
*
* @author tong
* @since 05-20
*/
@Slf4j
@Service
public class StockServiceImpl extends ServiceImpl<StockMapper, Stock> implements
IStockService {
    @Resource
    private StockMapper stockMapper;
    @Resource
    private TxLogMapper txLogMapper;
    @Transactional
    @Override
    public void decreaseStock(TxMessage txMessage) {
        log.info("库存微服务执行本地事务,商品id:{},购买数量:{}", txMessage.getProductId(),
txMessage.getPayCount());
        //检查是否执行过事务
        TxLog txLog = txLogMapper.selectById(txMessage.getTxNo());
        if(txLog != null){
            log.info("库存微服务已经执行过事务,事务编号为:{}", txMessage.getTxNo());
       }
        // 根据商品id查询库存
        QueryWrapper<Stock> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("product_id",txMessage.getProductId());
        Stock stock = stockMapper.selectOne(queryWrapper);
        if(stock.getTotalCount() < txMessage.getPayCount()){
            throw  new RuntimeException("库存不足");
       }
        // 减库存
       stock.setTotalCount(stock.getTotalCount()- txMessage.getPayCount());
        stockMapper.updateById(stock);
        //生成订单
        txLog = new TxLog();
        txLog.setTxNo(txMessage.getTxNo());
        txLog.setCreateTime(LocalDateTime.now());
        //添加事务日志
        txLogMapper.insert(txLog);
   }
}


库存微服务消费者实现


用于消费RocketMQ发送过来的事务消息,并且调用StockService中的decreaseStock(TxMessage)方法扣减库存。


库存事务消费者

package com.tong.stock.message;
import com.alibaba.fastjson.JSONObject;
import com.tong.stock.service.IStockService;
import com.tong.stock.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author binghe
* @version 1.0.0
* @description 库存事务消费者
*/
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "tx_stock_group", topic = "topic_txmsg")
public class StockTxMessageConsumer implements
RocketMQListener<String> {
    @Autowired
    private IStockService stockService;
    @Override
    public void onMessage(String message) {
        log.info("库存微服务开始消费事务消息:{}", message);
        TxMessage txMessage = this.getTxMessage(message);
        stockService.decreaseStock(txMessage);
   }
    private TxMessage getTxMessage(String msg){
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String txStr = jsonObject.getString("txMessage");
        return JSONObject.parseObject(txStr,TxMessage.class);
   }
}


可靠消息最终一致性分布式事务实战_测试程序


查询数据


正式测试之前,先来查询下tx-msg-orders数据库和tx-msg-stock数 据库各个数据表中的数据。


分别启动库存和订单微服务


编写控制层接口

@Autowired
    private IOrderService iOrderService;
    /**
     * 创建订单
     * @param productId 商品id
     * @param payCount 购买数量
     * @return
     */
    @GetMapping(value = "/submit_order")
    public String transfer(@RequestParam("productId")Long productId, @RequestParam("payCount") Integer payCount){ 
        iOrderService.submitOrder(productId, payCount);
        return "下单成功";
   }


分别启动库存微服务stock和订单微服务orders,并在浏览器中访问 http://localhost:9090/order/submit_order?productId=1001&pay Count=1

最终一致性分布式事务解决方案_什么是最大努力通知型分布式事务



最大努力通知型( Best-effort delivery)是最简单的一种柔性事务。


适用场景


最大努力通知型解决方案适用于最终一致性时间敏感度低的场景。 最典型的使用场景就是支付成功后,支付平台异步通知商户支付结 果。并且事务被动方的处理结果不会影响主动方的处理结果。 典型的使用场景:如银行通知、商户通知等。


流程图

最大努力通知型分布式事务_最大努力通知与可靠消息最终一致性的区别


最大努力通知型分布式事务解决方案


流程:


1、发起通知方将通知发给MQ。 使用普通消息机制将通知发给MQ。

2、接收通知方监听 MQ。

3、接收通知方接收消息,业务处理完成回应ack。

4、接收通知方若没有回应ack则MQ会重复通知。 MQ会按照间隔1min、5min、10min、 30min、1h、2h、5h、10h的方式,逐步拉大通知间隔(如果MQ采用 rocketMq,在 broker中可进行配置),直到达到通知要求的时间窗口上限。

5、接收通知方可通过消息校对接口来校对消息的一致性。

最大努力通知型分布式事务_案例业务说明



设计完数据库后,创建tx-notifymsg-account库

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for account_info
-- ----------------------------
DROP TABLE IF EXISTS `account_info`;
CREATE TABLE `account_info`  (
  `id` int(11) NOT NULL COMMENT '主键id',
  `account_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '账户',
`account_name` varchar(255) CHARACTER SET
utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'账户名',
  `account_balance` decimal(10, 2) NULL DEFAULT
NULL COMMENT '账户余额',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of account_info
-- ----------------------------
-- ----------------------------
-- Table structure for pay_info
-- ----------------------------
DROP TABLE IF EXISTS `pay_info`;
CREATE TABLE `pay_info`  (
  `tx_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '充值记录流水号',
  `account_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账
户',
  `pay_amount` decimal(10, 2) CHARACTER SET
utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'充值金额',
  `pay_result` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值
结果',
  `pay_time` datetime(0) NOT NULL COMMENT '充值
时间',
PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE
= utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of pay_info
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;


设计完数据库后,创建tx-notifymsg-payment库

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for pay_info
-- ----------------------------
DROP TABLE IF EXISTS `pay_info`;
CREATE TABLE `pay_info`  (
  `tx_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '充值记录流水
号',
  `account_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账
户',
  `pay_amount` decimal(10, 2) CHARACTER SET
utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'充值金额',
`pay_result` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值
结果',
  `pay_time` datetime(0) NOT NULL COMMENT '充值
时间',
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE
= utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of pay_info
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;


分布式事物【库存微服务业务层实现、实现充值微服务、充值微服务之业务层实现、账户微服务之业务层实现】(九)-全面详解(学习总结---从入门到深化)(下):https://developer.aliyun.com/article/1419999

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2天前
|
NoSQL 关系型数据库 MySQL
分布式系统学习9:分布式锁
本文介绍了分布式系统中分布式锁的概念、实现方式及其应用场景。分布式锁用于在多个独立的JVM进程间确保资源的互斥访问,具备互斥、高可用、可重入和超时机制等特点。文章详细讲解了三种常见的分布式锁实现方式:基于Redis、Zookeeper和关系型数据库(如MySQL)。其中,Redis适合高性能场景,推荐使用Redisson库;Zookeeper适用于对一致性要求较高的场景,建议基于Curator框架实现;而基于数据库的方式性能较低,实际开发中较少使用。此外,还探讨了乐观锁和悲观锁的区别及适用场景,并介绍了如何通过Lua脚本和Redis的`SET`命令实现原子操作,以及Redisson的自动续期机
37 7
|
4月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
4月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
21天前
|
Java 关系型数据库 数据库
微服务SpringCloud分布式事务之Seata
SpringCloud+SpringCloudAlibaba的Seata实现分布式事务,步骤超详细,附带视频教程
48 1
|
1月前
|
存储 运维 数据可视化
如何为微服务实现分布式日志记录
如何为微服务实现分布式日志记录
104 1
|
2月前
|
Kubernetes Cloud Native 开发者
云原生入门:从容器到微服务
本文将带你走进云原生的世界,从容器技术开始,逐步深入到微服务架构。我们将通过实际代码示例,展示如何利用云原生技术构建和部署应用。无论你是初学者还是有经验的开发者,这篇文章都将为你提供有价值的信息和启示。
|
2月前
|
Cloud Native 持续交付 云计算
云原生入门指南:从容器到微服务
【10月更文挑战第28天】在数字化转型的浪潮中,云原生技术成为推动现代软件开发的关键力量。本篇文章将带你了解云原生的基本概念,探索它如何通过容器化、微服务架构以及持续集成和持续部署(CI/CD)的实践来提升应用的可伸缩性、灵活性和可靠性。你将学习到如何利用这些技术构建和部署在云端高效运行的应用,并理解它们对DevOps文化的贡献。
77 2
|
2月前
|
Kubernetes 关系型数据库 MySQL
Kubernetes入门:搭建高可用微服务架构
【10月更文挑战第25天】在快速发展的云计算时代,微服务架构因其灵活性和可扩展性备受青睐。本文通过一个案例分析,展示了如何使用Kubernetes将传统Java Web应用迁移到Kubernetes平台并改造成微服务架构。通过定义Kubernetes服务、创建MySQL的Deployment/RC、改造Web应用以及部署Web应用,最终实现了高可用的微服务架构。Kubernetes不仅提供了服务发现和负载均衡的能力,还通过各种资源管理工具,提升了系统的可扩展性和容错性。
169 3
|
3月前
|
Dubbo Java 应用服务中间件
Dubbo学习圣经:从入门到精通 Dubbo3.0 + SpringCloud Alibaba 微服务基础框架
尼恩团队的15大技术圣经,旨在帮助开发者系统化、体系化地掌握核心技术,提升技术实力,从而在面试和工作中脱颖而出。本文介绍了如何使用Dubbo3.0与Spring Cloud Gateway进行整合,解决传统Dubbo架构缺乏HTTP入口的问题,实现高性能的微服务网关。
|
3月前
|
消息中间件 存储 负载均衡
微服务与分布式系统设计看这篇就够了!
【10月更文挑战第12天】 在现代软件架构中,微服务和分布式系统设计已经成为构建可扩展、灵活和可靠应用程序的主流方法。本文将深入探讨微服务架构的核心概念、设计原则和挑战,并提供一些关于如何在分布式系统中实现微服务的实用指导。
133 2

热门文章

最新文章