分布式事物【库存微服务业务层实现、实现充值微服务、充值微服务之业务层实现、账户微服务之业务层实现】(九)-全面详解(学习总结---从入门到深化)

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 分布式事物【库存微服务业务层实现、实现充值微服务、充值微服务之业务层实现、账户微服务之业务层实现】(九)-全面详解(学习总结---从入门到深化)



可靠消息最终一致性分布式事务实战_库存微服务业务层实现

库存微服务的业务逻辑层主要监听RocketMQ发送过来的事务消 息,并在本地事务中执行扣减库存的操作。

编写库存接口

/**
     * 扣减库存
     */
    void decreaseStock(TxMessage txMessage);

库存接口实现类

package com.tong.stock.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.tong.stock.entity.Stock;
import com.tong.stock.entity.TxLog;
import com.tong.stock.mapper.StockMapper;
import com.tong.stock.mapper.TxLogMapper;
import com.tong.stock.service.IStockService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.tong.stock.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
/**
* <p>
* 服务实现类
* </p>
*
* @author tong
* @since 05-20
*/
@Slf4j
@Service
public class StockServiceImpl extends ServiceImpl<StockMapper, Stock> implements
IStockService {
    @Resource
    private StockMapper stockMapper;
    @Resource
    private TxLogMapper txLogMapper;
    @Transactional
    @Override
    public void decreaseStock(TxMessage txMessage) {
        log.info("库存微服务执行本地事务,商品id:{},购买数量:{}", txMessage.getProductId(),
txMessage.getPayCount());
        //检查是否执行过事务
        TxLog txLog = txLogMapper.selectById(txMessage.getTxNo());
        if(txLog != null){
            log.info("库存微服务已经执行过事务,事务编号为:{}", txMessage.getTxNo());
       }
        // 根据商品id查询库存
        QueryWrapper<Stock> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("product_id",txMessage.getProductId());
        Stock stock = stockMapper.selectOne(queryWrapper);
        if(stock.getTotalCount() < txMessage.getPayCount()){
            throw  new RuntimeException("库存不足");
       }
        // 减库存
       stock.setTotalCount(stock.getTotalCount()- txMessage.getPayCount());
        stockMapper.updateById(stock);
        //生成订单
        txLog = new TxLog();
        txLog.setTxNo(txMessage.getTxNo());
        txLog.setCreateTime(LocalDateTime.now());
        //添加事务日志
        txLogMapper.insert(txLog);
   }
}

库存微服务消费者实现

用于消费RocketMQ发送过来的事务消息,并且调用StockService中的decreaseStock(TxMessage)方法扣减库存。

库存事务消费者

package com.tong.stock.message;
import com.alibaba.fastjson.JSONObject;
import com.tong.stock.service.IStockService;
import com.tong.stock.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author binghe
* @version 1.0.0
* @description 库存事务消费者
*/
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "tx_stock_group", topic = "topic_txmsg")
public class StockTxMessageConsumer implements
RocketMQListener<String> {
    @Autowired
    private IStockService stockService;
    @Override
    public void onMessage(String message) {
        log.info("库存微服务开始消费事务消息:{}", message);
        TxMessage txMessage = this.getTxMessage(message);
        stockService.decreaseStock(txMessage);
   }
    private TxMessage getTxMessage(String msg){
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String txStr = jsonObject.getString("txMessage");
        return JSONObject.parseObject(txStr,TxMessage.class);
   }
}

可靠消息最终一致性分布式事务实战_测试程序

查询数据

正式测试之前,先来查询下tx-msg-orders数据库和tx-msg-stock数 据库各个数据表中的数据。

分别启动库存和订单微服务

编写控制层接口

@Autowired
    private IOrderService iOrderService;
    /**
     * 创建订单
     * @param productId 商品id
     * @param payCount 购买数量
     * @return
     */
    @GetMapping(value = "/submit_order")
    public String transfer(@RequestParam("productId")Long productId, @RequestParam("payCount") Integer payCount){ 
        iOrderService.submitOrder(productId, payCount);
        return "下单成功";
   }

分别启动库存微服务stock和订单微服务orders,并在浏览器中访问 http://localhost:9090/order/submit_order?productId=1001&pay Count=1

最终一致性分布式事务解决方案_什么是最大努力通知型分布式事务

最大努力通知型( Best-effort delivery)是最简单的一种柔性事务。

适用场景

最大努力通知型解决方案适用于最终一致性时间敏感度低的场景。 最典型的使用场景就是支付成功后,支付平台异步通知商户支付结 果。并且事务被动方的处理结果不会影响主动方的处理结果。 典型的使用场景:如银行通知、商户通知等。

流程图

最大努力通知型分布式事务_最大努力通知与可靠消息最终一致性的区别

最大努力通知型分布式事务解决方案

流程:

1、发起通知方将通知发给MQ。 使用普通消息机制将通知发给MQ。

2、接收通知方监听 MQ。

3、接收通知方接收消息,业务处理完成回应ack。

4、接收通知方若没有回应ack则MQ会重复通知。 MQ会按照间隔1min、5min、10min、 30min、1h、2h、5h、10h的方式,逐步拉大通知间隔(如果MQ采用 rocketMq,在 broker中可进行配置),直到达到通知要求的时间窗口上限。

5、接收通知方可通过消息校对接口来校对消息的一致性。

最大努力通知型分布式事务_案例业务说明

设计完数据库后,创建tx-notifymsg-account库

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for account_info
-- ----------------------------
DROP TABLE IF EXISTS `account_info`;
CREATE TABLE `account_info`  (
  `id` int(11) NOT NULL COMMENT '主键id',
  `account_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '账户',
`account_name` varchar(255) CHARACTER SET
utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'账户名',
  `account_balance` decimal(10, 2) NULL DEFAULT
NULL COMMENT '账户余额',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of account_info
-- ----------------------------
-- ----------------------------
-- Table structure for pay_info
-- ----------------------------
DROP TABLE IF EXISTS `pay_info`;
CREATE TABLE `pay_info`  (
  `tx_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '充值记录流水号',
  `account_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账
户',
  `pay_amount` decimal(10, 2) CHARACTER SET
utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'充值金额',
  `pay_result` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值
结果',
  `pay_time` datetime(0) NOT NULL COMMENT '充值
时间',
PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE
= utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of pay_info
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;

设计完数据库后,创建tx-notifymsg-payment库

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for pay_info
-- ----------------------------
DROP TABLE IF EXISTS `pay_info`;
CREATE TABLE `pay_info`  (
  `tx_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '充值记录流水
号',
  `account_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账
户',
  `pay_amount` decimal(10, 2) CHARACTER SET
utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'充值金额',
`pay_result` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值
结果',
  `pay_time` datetime(0) NOT NULL COMMENT '充值
时间',
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE
= utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of pay_info
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;

最大努力通知型分布式事务实战_实现充值微服务

主要实现功能

1、充值接口

2、查询充值结果接口

创建父项目rocketmq-notifymsg

创建子工程

引入依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starterweb</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-bootstarter</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connectorjava</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-bootstarter</artifactId>
            <version>2.0.1</version>
        </dependency>
        <!-- 引入nacos依赖 -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starteralibaba-nacos-discovery</artifactId>
        </dependency>
    </dependencies>

编写主启动类

@EnableDiscoveryClient
@MapperScan("com.tong.payment.mapper")
@SpringBootApplication
@Slf4j
public class PayMain7071 {
    public static void main(String[] args) {
       SpringApplication.run(PayMain7071.class,args);
        log.info("*********** 充值服务启动成功*********");
   }
}

编写配置文件

server:
 port: 7071
spring:
 cloud:
   nacos:
     discovery:
       server-addr: 192.168.66.100:8848
 application:
   name: tx-notifymsg-pay
 datasource:
   url: jdbc:mysql://192.168.66.100:3306/txnotifymsg-payment?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false
   username: root
   password01: 123456
   driver-class-name: com.mysql.cj.jdbc.Driver
################ RocketMQ 配置 ##########
rocketmq:
 name-server: 192.168.66.100:9876
 producer:
   group: payment-group

最大努力通知型分布式事务_充值微服务之业务层实现

充值微服务的业务逻辑层主要完成充值的业务逻辑处理,当充值成功时,会向RocketMQ发送充值结果信息,同时提供业务逻辑层查询充值结果信息的接口。

编写充值接口

public interface IPayInfoService extends IService<PayInfo> {
    /**
     * 保存充值信息
     */
    PayInfo savePayInfo(PayInfo payInfo);
    /**
     * 查询指定的充值信息
     */
    PayInfo getPayInfoByTxNo(String txNo);
}

充值接口实现

@Slf4j
@Service
public class PayInfoServiceImpl extends
ServiceImpl<PayInfoMapper, PayInfo> implements IPayInfoService {
    @Resource
    private PayInfoMapper payInfoMapper;
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    @Override
    public PayInfo savePayInfo(PayInfo payInfo)
{
      payInfo.setTxNo(UUID.randomUUID().toString().replace("-",""));
        payInfo.setPayResult("success");
      payInfo.setPayTime(LocalDateTime.now());
        int count = payInfoMapper.insert(payInfo);
        //充值信息保存成功
        if(count > 0){
            log.info("充值微服务向账户微服务发送结果消息");
            //发送消息通知账户微服务
            rocketMQTemplate.convertAndSend("topic_nofitymsg",JSON.toJSONString(payInfo));
            return payInfo;
       }
        return null;
   }
    @Override
    public PayInfo getPayInfoByTxNo(String txNo) {
        return baseMapper.selectById(txNo);
   }
}

编写充值接口

@RestController
@RequestMapping("/payInfo")
public class PayInfoController {
    @Autowired
    private IPayInfoService payInfoService;
    /**
     * 充值
     * @param payInfo
     * @return
     */
    @GetMapping(value = "/pay_account")
    public PayInfo pay(PayInfo payInfo){
        //生成事务编号
        return payInfoService.savePayInfo(payInfo);
   }
    /**
     * 查询充值结果
     * @param txNo
     * @return
     */
    @GetMapping(value = "/query/payresult/{txNo}")
    public PayInfo payResult(@PathVariable("txNo") String txNo){
        return payInfoService.getPayInfoByTxNo(txNo);
   }
}

最大努力通知型分布式事务_实现账户微服务

创建子工程account

引入依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starterweb</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-bootstarter</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connectorjava</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-bootstarter</artifactId>
            <version>2.0.1</version>
        </dependency>
        <!--   引入Nacos注册中心   -->
        <dependency>
           <groupId>com.alibaba.cloud</groupId>
           <artifactId>spring-cloud-starteralibaba-nacos-discovery</artifactId>
        </dependency>
        <!--   引入OpenFeign -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starteropenfeign</artifactId>
        </dependency>
        <!--   引入负载均衡器-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloudloadbalancer</artifactId>
        </dependency>
    </dependencies>

编写配置文件

server:
 port: 7070
spring:
 cloud:
   nacos:
     discovery:
       server-addr: 192.168.66.100:8848
 application:
   name: tx-notifymsg-account
 datasource:
   url: jdbc:mysql://192.168.66.100:3306/txnotifymsg-account?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false
   username: root
   password01: 123456
   driver-class-name: com.mysql.jdbc.Driver
################ RocketMQ 配置 ##########
rocketmq:
 name-server: 192.168.66.100:9876

最大努力通知型分布式事务_账户微服务之业务层实现

RocketMQ消费充值信息

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "consumer_group_account", topic = "topic_nofitymsg")
public class NotifyMsgAccountListener implements RocketMQListener<String> {
    @Autowired
    private IAccountInfoService accountInfoService;
    @Override
    public void onMessage(String message) {
        log.info("账户微服务收到RocketMQ的消息: {}", JSONObject.toJSONString(message));
        //如果是充值成功,则修改账户余额
        PayInfo payInfo = JSON.parseObject(message, PayInfo.class);
      if("success".equals(payInfo.getPayResult())){
           accountInfoService.updateAccountBalance(payInfo);
       }
        log.info("更新账户余额完毕:{}", JSONObject.toJSONString(payInfo));
   }
}

编写账户操作接口

/**
     * 更新账户余额
     */
    void updateAccountBalance(PayInfo payInfo);

实现账户操作接口

@Slf4j
@Service
public class AccountInfoServiceImpl extends
ServiceImpl<AccountInfoMapper, AccountInfo> implements IAccountInfoService {
    @Resource
    private AccountInfoMapper accountInfoMapper;
    @Resource
    private PayInfoMapper payInfoMapper;
    /**
     *
     * @param payInfo
     */
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void updateAccountBalance(PayInfo payInfo) {
        if(payInfoMapper.selectById(payInfo.getTxNo()) != null){
            log.info("账户微服务已经处理过当前事务...");
            return;
       }
        LambdaUpdateWrapper<AccountInfo> lambdaUpdateWrapper = new LambdaUpdateWrapper<>();
        lambdaUpdateWrapper.eq(AccountInfo::getAccountNo,payInfo.getAccountNo());
        //更新账户余额
        List<AccountInfo> accountInfos = baseMapper.selectList(lambdaUpdateWrapper);
        if (accountInfos != null && !accountInfos.isEmpty()){
            AccountInfo accountInfo = accountInfos.get(0);
            accountInfo.setAccountBalance(accountInfo.getAccountBalance().add(payInfo.getPayAmount()));
            accountInfoMapper.updateById(accountInfo);
       }
        //保存充值记录
        payInfoMapper.insert(payInfo);
   }
}

最大努力通知型分布式事务_账户微服务远程调用实现

主启动类加Feign注解

@EnableDiscoveryClient
@EnableFeignClients
@MapperScan("com.tong.account.mapper")
@SpringBootApplication
@Slf4j
public class AccountMain7070 {
    public static void main(String[] args) {
       SpringApplication.run(AccountMain7070.class,args);
        log.info("*********** AccountMain7070启动成功 *********");
   }
}

编写远程调用接口

@Service
@FeignClient("tx-notifymsg-pay")
public interface IPayFeignService {
    @GetMapping(value = "/payInfo/query/payresult/{txNo}")
    PayInfo payResult(@PathVariable("txNo") String txNo);
}

编写查询账户接口

/**
     * 查询充值结果
     */
    PayInfo queryPayResult(String txNo);

实现查询账户信息

/**
     * 查询结果
     * @param txNo
     * @return
     */
    @Override
    public PayInfo queryPayResult(String txNo)
      {
        try{
            return iPayFeignService.payResult(txNo);
       }catch (Exception e){
            log.error("查询充值结果异常:{}", e);
       }
        return null;
   }

编写查询充值结果接口

/**
     * 主动查询充值结果
     * @param txNo
     * @return
     */
  @GetMapping(value = "/query/payresult/{txNo}")
    public ResponseEntity result(@PathVariable("txNo") String txNo){
        return ResponseEntity.ok(accountInfoService.queryPayResult(txNo));
   }

最大努力通知型分布式事务_测试程序

查看account库和payment库数据

启动账户和充值微服务

调用充值微服务的接口http://localhost:7071/payInfo/pay_accoun t为账户编号为1001的账户充值1000元。

账户微服务的日志文件中输出如下信息

可以看到,充值微服务将充值结果信息成功发送到了RocketMQ, 并且账户微服务成功订阅了RocketMQ的消息并执行了本地事务。

查询充值结果

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
1月前
|
SQL 关系型数据库 数据库
学习分布式事务Seata看这一篇就够了,建议收藏
学习分布式事务Seata看这一篇就够了,建议收藏
|
3天前
|
Dubbo Java 应用服务中间件
Java从入门到精通:3.2.2分布式与并发编程——了解分布式系统的基本概念,学习使用Dubbo、Spring Cloud等分布式框架
Java从入门到精通:3.2.2分布式与并发编程——了解分布式系统的基本概念,学习使用Dubbo、Spring Cloud等分布式框架
|
2月前
|
NoSQL Java API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
298 0
|
1月前
|
NoSQL 算法 安全
Redlock 算法-主从redis分布式锁主节点宕机锁丢失的问题
Redlock 算法-主从redis分布式锁主节点宕机锁丢失的问题
155 0
|
1月前
|
NoSQL 关系型数据库 MySQL
分布式锁(redis/mysql)
分布式锁(redis/mysql)
61 1
|
3月前
|
NoSQL Java 测试技术
字节二面:Spring Boot Redis 可重入分布式锁实现原理?
字节二面:Spring Boot Redis 可重入分布式锁实现原理?
163 1
|
3月前
|
存储 缓存 NoSQL
【分布式】Redis与Memcache的对比分析
【1月更文挑战第25天】【分布式】Redis与Memcache的对比分析
|
3月前
|
监控 NoSQL Linux
【分布式】Redis的持久化方案解析
【1月更文挑战第25天】【分布式】Redis的持久化方案解析
|
1月前
|
NoSQL Java Redis
如何通俗易懂的理解Redis分布式锁
在多线程并发的情况下,我们如何保证一个代码块在同一时间只能由一个线程访问呢?
38 2
|
1月前
|
缓存 NoSQL Java
【Redis】5、Redis 的分布式锁、Lua 脚本保证 Redis 命令的原子性
【Redis】5、Redis 的分布式锁、Lua 脚本保证 Redis 命令的原子性
62 0

热门文章

最新文章