分布式事物-全面详解(学习总结---从入门到深化)(5)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 编写转出微服务Confirm阶段

Hmily实现TCC事务_转入转出微服务实现Confirm阶段

2345_image_file_copy_417.jpg

编写转出微服务Confirm阶段

    /**
     * 确认阶段
     * @param userAccountDTO
     */
    public void sayConfrim(UserAccountDTO userAccountDTO) {
        String txNo = userAccountDTO.getTxNo();
        log.info("**********   执行bank01 的 Confrim方法 ,事务id={}",txNo);
        // 1、幂等处理
        ConfirmLog confirmLog = confirmLogMapper.selectById(txNo);
        if (confirmLog != null){
            return ;
       }
        // 2、根据账户id查询账户
        UserAccount userAccount = baseMapper.selectById(userAccountDTO.getSourceAccountNo());
        userAccount.setTransferAmount(userAccount.getTransferAmount().subtract(userAccountDTO.getBigDecimal()));
        baseMapper.updateById(userAccount);
        // 3、 确认日志记录
        ConfirmLog confirmLog1 = new ConfirmLog();
        confirmLog1.setTxNo(userAccountDTO.getTxNo());
        confirmLog1.setCreateTime(LocalDateTime.now());
        confirmLogMapper.insert(confirmLog1);
   }

编写转入微服务Confirm阶段

    /**
     * 确认阶段
     * @param userAccountDTO
     */
    public void sayConfrim(UserAccountDTO userAccountDTO) {
        String txNo = userAccountDTO.getTxNo();
        log.info("**********   执行bank02 的Confrim方法 ,事务id={}",txNo);
        // 1、幂等处理
        ConfirmLog confirmLog = confirmLogMapper.selectById(txNo);
        if (confirmLog != null) {
            return;
       }
        // 2、根据账户id查询账户
        UserAccount userAccount = userAccountMapper.selectById(userAccountDTO.getTargetAccountNo());
        userAccount.setAccountBalance(userAccount.getAccountBalance().add(userAccountDTO.getBigDecimal()));
      userAccount.setTransferAmount(userAccount.getTransferAmount().subtract(userAccountDTO.getBigDecimal()));
      userAccountMapper.updateById(userAccount);
        // 3、 确认日志记录
        ConfirmLog confirmLog1 = new ConfirmLog();
        confirmLog1.setTxNo(userAccountDTO.getTxNo());
        confirmLog1.setCreateTime(LocalDateTime.now());
        confirmLogMapper.insert(confirmLog1);
   }

Hmily实现TCC分布式事务_转入转出微服务实现Cancel阶段

转入微服务Cananl阶段

 /**
     * 回滚
     * @param userAccountDto
     */
    @Transactional(rollbackFor = Exception.class)
    public void cancelMethod(UserAccountDto userAccountDto){
        String txNo = userAccountDto.getTxNo();
        log.info("执行bank02的cancel方法,事务id: {}, 参数为:{}",txNo,JSONObject.toJSONString(userAccountDto));
        CancelLog cancelLog = iCancelLogService.findByTxNo(txNo);
        if(cancelLog != null){
            log.info("bank02已经执行过Cancel方法,txNo:{}", txNo);
            return;
       }
        // 保存记录
       iCancelLogService.saveCancelLog(txNo);
       userAccountMapper.cancelUserAccountBalanceBank02(userAccountDto.getAmount(),
       userAccountDto.getTargetAccountNo());
   }

转出微服务Cancel阶段

    /**
     * 取消阶段
     * @param userAccountDTO
     */
    public void sayCancel(UserAccountDTO userAccountDTO) {
        String txNo = userAccountDTO.getTxNo();
        log.info("**********   执行bank01 的 Cancel方法 ,事务id={}",txNo);
        // 1. 幂等处理
        CancelLog cancelLog = cancelLogMapper.selectById(txNo);
        if (cancelLog != null ){
            return;
       }
        // 2、根据账户id查询账户
        UserAccount userAccount = baseMapper.selectById(userAccountDTO.getSourceAccountNo());
        userAccount.setAccountBalance(userAccount.getAccountBalance().add(userAccountDTO.getBigDecimal()));
        userAccount.setTransferAmount(userAccount.getTransferAmount().subtract(userAccountDTO.getBigDecimal()));
        baseMapper.updateById(userAccount);
        // 3、记录回滚日志
        CancelLog cancelLog1 = new CancelLog();
        cancelLog1.setTxNo(txNo);
        cancelLog1.setCreateTime(LocalDateTime.now());
        cancelLogMapper.insert(cancelLog1);
   }

最终一致性分布式事务解决方案_什么是可靠消息最终一致性事务

2345_image_file_copy_418.jpg

可靠消息最终一致性的基本原理是事务发起方(消息发送者)执行 本地事务成功后发出一条消息,事务参与方(消息消费者)接收到 事务发起方发送过来的消息,并成功执行本地事务。事务发起方和事务参与方最终的数据能够达到一致的状态。

两种实现方式:

1、基于本地消息表

2、基于支持分布式事务的消息中间件,如RocketMQ等

基本原理

2345_image_file_copy_419.jpg

2345_image_file_copy_420.jpg

在使用可靠消息最终一致性方案解决分布式事务的问题时,需要确保消息发送和消息消费的一致性,从而确保消息的可靠性。

可靠消息最终一致性分布式事务实现_本地消息表

2345_image_file_copy_421.jpg

本地消息表模式的核心通过本地事务保证数据业务操作和消息的一 致性,然后通过定时任务发送给消费方或者中间加一层MQ的方 式,保障数据最终一致性。

库表设计

订单微服务中出库本地消息表:

2345_image_file_copy_422.jpg

基础功能

2345_image_file_copy_423.jpg2345_image_file_copy_424.jpg

分析

2345_image_file_copy_425.jpg

Task微服务的任务

2345_image_file_copy_426.jpg

2345_image_file_copy_427.jpg

可靠消息最终一致性分布式事务实现_RocketMQ事务消息

2345_image_file_copy_428.jpg

RocketMQ是阿里巴巴开源的一款支持事务消息的消息中间件,于 2012年正式开源,2017年成为Apache基金会的顶级项目。

实现原理

RocketMQ 4.3版之后引入了完整的事务消息机制,其内部实现了完 整的本地消息表逻辑,使用RocketMQ实现可靠消息分布式事务就 不用用户再实现本地消息表的逻辑了,极大地减轻了开发工作量。

2345_image_file_copy_429.jpg

2345_image_file_copy_430.jpg

可靠消息最终一致性分布式事务实战_案列业务介绍

2345_image_file_copy_431.jpg

业务介绍

通过RocketMQ中间件实现可靠消息最终一致性分布式事务,模拟 商城业务中的下单扣减库存场景。订单微服务和库存微服务分别独立开发和部署。

2345_image_file_copy_432.jpg

2345_image_file_copy_433.jpg

流程

2345_image_file_copy_434.jpg

架构选型

2345_image_file_copy_435.jpg

数据库表设计

orders订单数据表

orders数据表存储于tx-msg-orders订单数据库。

2345_image_file_copy_436.jpg

DROP TABLE IF EXISTS `orders`;
CREATE TABLE `order`  (
  `id` bigint(20) NOT NULL COMMENT '主键',
  `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
  `order_no` varchar(64) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '订单
编号',
  `product_id` bigint(20) NULL DEFAULT NULL COMMENT '商品id',
  `pay_count` int(11) NULL DEFAULT NULL COMMENT '购买数量',
  PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE
= utf8_bin ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
CREATE TABLE `tx_log`  (
  `tx_no` varchar(64) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '分布式事务全局序列号',
  `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;

stock库存数据表

2345_image_file_copy_437.jpg

DROP TABLE IF EXISTS `stock`;
CREATE TABLE `stock`  (
  `id` bigint(20) NOT NULL COMMENT '主键id',
  `product_id` bigint(20) NULL DEFAULT NULL COMMENT '商品id',
  `total_count` int(11) NULL DEFAULT NULL COMMENT '商品总库存',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
-- ----------------------------
-- Table structure for tx_log
-- ----------------------------
DROP TABLE IF EXISTS `tx_log`;
CREATE TABLE `tx_log`  (
  `tx_no` varchar(64) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '分布式事务全局序列号',
  `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;

tx_log事务记录表

2345_image_file_copy_438.jpg

可靠消息最终一致性分布式事务实战_Docker安装 RocketMQ

2345_image_file_copy_439.jpg

在安装RocketMQ之前,我们先了解一下RocketMQ的部署架构,了 解一下RocketMQ的组件,然后基于当前主流的Docker安装 RocketMQ,我们这里安装单台RocketMQ,但为了防止单节点故 障、保障高可用,生产环境建议安装RocketMQ集群。

2345_image_file_copy_440.jpg

2345_image_file_copy_441.jpg

安装NameServer

拉取镜像

docker pull rocketmqinc/rocketmq

创建数据存储目录

mkdir -p /docker/rocketmq/data/namesrv/logs
/docker/rocketmq/data/namesrv/store

启动NameServer

docker run -d \
--restart=always \
--name rmqnamesrv \
-p 9876:9876 \
-v
/docker/rocketmq/data/namesrv/logs:/root/logs \
-v
/docker/rocketmq/data/namesrv/store:/root/store
\
-e "MAX_POSSIBLE_HEAP=100000000" \
rocketmqinc/rocketmq \
sh mqnamesrv

2345_image_file_copy_442.jpg

安装Broker

border配置:创建 broker.conf 配置文件

vim /docker/rocketmq/conf/broker.conf

# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
#broker名称,master和slave使用相同的名称,表明他们的
主从关系
brokerName = broker-a
#0表示Master,大于0表示不同的
slave brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和
异步表示Master和Slave之间同步数据的机 制;
brokerRole = ASYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷
盘和异步刷盘;SYNC_FLUSH消息写入磁盘后 才返回成功状
态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
# 设置broker节点所在服务器的ip地址
brokerIP1 = 192.168.66.100
#剩余磁盘比例
diskMaxUsedSpaceRatio=99

启动broker

docker run -d --restart=always --name rmqbroker
--link rmqnamesrv:namesrv -p 10911:10911 -p
10909:10909 --privileged=true -v
/docker/rocketmq/data/broker/logs:/root/logs -v
/docker/rocketmq/data/broker/store:/root/store
-v
/docker/rocketmq/conf/broker.conf:/opt/rocketmq
-4.4.0/conf/broker.conf -e
"NAMESRV_ADDR=namesrv:9876" -e
"MAX_POSSIBLE_HEAP=200000000"
rocketmqinc/rocketmq sh mqbroker -c
/opt/rocketmq-4.4.0/conf/broker.conf

2345_image_file_copy_443.jpg

报错:

2345_image_file_copy_444.jpg

2345_image_file_copy_445.jpg

部署RocketMQ的管理工具

RocketMQ提供了UI管理工具,名为rocketmq-console,我们选择 docker安装

#创建并启动容器
docker run -d --restart=always --name rmqadmin
-e "JAVA_OPTS=-
Drocketmq.namesrv.addr=192.168.66.100:9876 -
Dcom.rocketmq.sendMessageWithVIPChannel=false"
-p 8080:8080 pangliang/rocketmq-console-ng

关闭防火墙(或者开放端口)

#关闭防火墙
systemctl stop firewalld.service
#禁止开机启动
systemctl disable firewalld.service

测试

访问:http://192.168.66.101:8080/#/ (可以切换中文)

2345_image_file_copy_446.jpg

可靠消息最终一致性分布式事务实战_实现订单微服务

2345_image_file_copy_447.jpg

创建父工程rocketmq-msg

2345_image_file_copy_448.jpg

创建订单微服务子工程

2345_image_file_copy_449.jpg

引入依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starterweb</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connectorjava</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-bootstarter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-bootstarter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

编写配置文件

server:
 port: 9090
spring:
 application:
   name: tx-msg-stock
 datasource:
   url: jdbc:mysql://192.168.66.100:3306/txmsg-order?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false
   username: root
   password: 123456
   driver-class-name: com.mysql.cj.jdbc.Driver
################ RocketMQ 配置 ##########
rocketmq:
 name-server: 192.168.66.100:9876
 producer:
   group: order-group

编写主启动类

/**
 * 订单微服务启动成功
 */
@Slf4j
@MapperScan("com.itbaizhan.order.mapper")
@SpringBootApplication
public class OrderMain9090 {
    public static void main(String[] args) {
      SpringApplication.run(OrderMain9090.class,args);
        log.info("************* 订单微服务启动成功*******");
   }
}

代码生成

package com.itbaizhan.utils;
import com.baomidou.mybatisplus.generator.FastAutoGenerator;
import com.baomidou.mybatisplus.generator.config.rules.NamingStrategy;
import java.util.Arrays;
import java.util.List;
public class CodeGenerator {
    public static void main(String[] args) {
      FastAutoGenerator.create("jdbc:mysql://192.168.66.102:3306/tx-msg-order", "root", "123456")
               .globalConfig(builder -> {
                    builder.author("itbaizhan")// 设置作者
                           .commentDate("MMdd") // 注释日期格式
                           .outputDir(System.getProperty("user.dir")+"/rocketmq-msg/orders"+ "/src/main/java/")
                           .fileOverride(); //覆盖文件
               })
                // 包配置
               .packageConfig(builder -> {
                  builder.parent("com.itbaizhan.orders") // 包名前缀
                           .entity("entity")//实体类包名
                           .mapper("mapper")//mapper接口包名
                           .service("service"); //service包名
               })
              .strategyConfig(builder -> {
                    // 设置需要生成的表名
                  builder.addInclude(Arrays.asList("orders","tx_log"))
                            // 开始实体类配置
                           .entityBuilder()
                            // 开启lombok模型
                           .enableLombok()
                            //表名下划线转驼峰
                           .naming(NamingStrategy.underline_to_camel)
                            //列名下划线转驼峰
                           .columnNaming(NamingStrategy.underline_to_camel);
               })
               .execute();
   }
}

创建TxMessage类

在项目的com.itbaizhan.orders.tx包下创建TxMessage类,主要用 来封装实现分布式事务时,在订单微服务、RocketMQ消息中间件 和库存微服务之间传递的全局事务消息,项目中会通过事务消息实现幂等。

@Data
@NoArgsConstructor
@AllArgsConstructor
public class TxMessage implements Serializable
{
    private static final long serialVersionUID = -4704980150056885074L;
    /**
     * 商品id
     */
    private Long productId;
    /**
     * 商品购买数量
     */
    private Integer payCount;
    /**
     * 全局事务编号
     */
    private String txNo;
}

可靠消息最终一致性分布式事务实战_订单微服务业务层实现

2345_image_file_copy_450.jpg 

业务逻辑层主要实现了用户提交订单后的业务逻辑。

编写OrderService接口

    /**
     * 添加订单
     * @param productId 商品id
     * @param payCount 购买数量
     */
    void save(Long productId,Integer payCount);
    /**
     * 提交订单同时保存事务信息
     */
    void submitOrderAndSaveTxNo(TxMessage txMessage);
    /**
     * 提交订单
     * @param productId 商品id
     * @param payCount 购买数量
     */
    void submitOrder(Long productId, Integer payCount);

编写OrderService接口实现

package com.itbaizhan.order.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.itbaizhan.order.entity.Order;
import com.itbaizhan.order.entity.TxLog;
import com.itbaizhan.order.mapper.OrderMapper;
import com.itbaizhan.order.mapper.TxLogMapper;
import com.itbaizhan.order.service.IOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.itbaizhan.order.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.UUID;
/**
* <p>
* 服务实现类
* </p>
*
* @author itbaizhan
* @since 05-20
*/
@Slf4j
@Service
public class OrderServiceImpl extends
ServiceImpl<OrderMapper, Order> implements IOrderService {
    @Resource
    RocketMQTemplate rocketMQTemplate;
    @Resource
  private TxLogMapper txLogMapper;
    /**
     * 添加
     * @param productId 商品id
     * @param payCount 购买数量
     */
    @Override
    public void save(Long productId, Integer payCount) {
        Order order = new Order();
        // 订单创建时间
        order.setCreateTime(LocalDateTime.now());
        // 生产订单编号
        order.setOrderNo(UUID.randomUUID().toString().replace("-",""));
        // 商品id
        order.setProductId(productId);
        // 购买数量
        order.setPayCount(payCount);
        baseMapper.insert(order);
   }
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void submitOrderAndSaveTxNo(TxMessage txMessage) {
        TxLog txLog = txLogMapper.selectById(txMessage.getTxNo());
    if(txLog != null){
            log.info("订单微服务已经执行过事务,商品id为:{},事务编号为:{}",txMessage.getProductId(),txMessage.getTxNo());
            return;
       }
        //生成订单
      this.save(txMessage.getProductId(),txMessage.getPayCount());
        //生成订单
        txLog = new TxLog();
        txLog.setTxNo(txMessage.getTxNo());
        txLog.setCreateTime(LocalDateTime.now());
        //添加事务日志
        txLogMapper.insert(txLog);
   }
    /**
     * 提交订单
     * @param productId 商品id
     * @param payCount 购买数量
     */
    @Override
    public void submitOrder(Long productId,Integer payCount) {
        //生成全局分布式序列号
        String txNo = UUID.randomUUID().toString();
        TxMessage txMessage = new TxMessage(productId, payCount, txNo);
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("txMessage", txMessage);
        Message<String> message = MessageBuilder.withPayload(jsonObject.toJSONString()).build();
        //发送事务消息   且该消息不允许消费   
        tx_order_group: 指定版事务消息组
      rocketMQTemplate.sendMessageInTransaction("tx_order_group", "topic_txmsg", message, null);
   }
}

可靠消息最终一致性分布式事务实战_订单微服务监听事务消息

执行本地的业务代码

package com.itbaizhan.order.message;
import com.alibaba.fastjson.JSONObject;
import com.itbaizhan.order.entity.TxLog;
import com.itbaizhan.order.mapper.TxLogMapper;
import com.itbaizhan.order.service.IOrderService;
import com.itbaizhan.order.service.ITxLogService;
import com.itbaizhan.order.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
* @author itbaizhan
* @version 1.0.0
* @description 监听事务消息
*/
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "tx_order_group")
public class OrderTxMessageListener implements
RocketMQLocalTransactionListener {
    @Autowired
    private IOrderService orderService;
    @Resource
    private TxLogMapper txLogMapper;
    /**
     * RocketMQ的Producer本地事务:先执行本地的业务代码(使用Spring的事件管理),判断是否成功。
     * 成功返回: RocketMQLocalTransactionState.COMMIT,
     * 失败返回:RocketMQLocalTransactionState.ROLLBACK
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object obj) 
      {
        try {
            log.info("订单微服务执行本地事务");
            TxMessage txMessage = this.getTxMessage(msg);
            //执行本地事务
            orderService.submitOrderAndSaveTxNo(txMessage);
            //提交事务
            log.info("订单微服务提交事务");
            // COMMIT:即生产者通知Rocket该消息可以消费
            return RocketMQLocalTransactionState.COMMIT;
       } catch (Exception e) {
            e.printStackTrace();
            //异常回滚事务
            log.info("订单微服务回滚事务");
            // ROLLBACK:即生产者通知Rocket将该消息删除
            return RocketMQLocalTransactionState.ROLLBACK;
       }
   }
    private TxMessage getTxMessage(Message msg)
     {
        String messageString = new String((byte[]) msg.getPayload());
        JSONObject jsonObject = JSONObject.parseObject(messageString);
        String txStr = jsonObject.getString("txMessage");
        return JSONObject.parseObject(txStr,TxMessage.class);
   }
}

网络异常消息处理

    /**
     * 因为网络异常或其他原因时,RocketMQ的消息状态处于UNKNOWN时,调用该方法检查Producer的本地 
     * 事务是否已经执行成功,
     * 成功返回: RocketMQLocalTransactionState.COMMIT,
     * 失败返回:RocketMQLocalTransactionState.ROLLBACK
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info("订单微服务查询本地事务");
        TxMessage txMessage = this.getTxMessage(msg);
        // 获取订单的消息
        Integer exists = txLogService.isExistsTx(txMessage.getTxNo());
        if (exists != null) {
            // COMMIT:即生产者通知Rocket该消息可以消费
            return RocketMQLocalTransactionState.COMMIT;
       }
        // UNKNOWN:即生产者通知Rocket继续查询该消息的状态
        return RocketMQLocalTransactionState.UNKNOWN;
}
   private TxMessage getTxMessage(Message msg)
     {
        String messageString = new String((byte[]) msg.getPayload());
        JSONObject jsonObject = JSONObject.parseObject(messageString);
        String txStr = jsonObject.getString("txMessage");
        return JSONObject.parseObject(txStr,TxMessage.class);
     }

可靠消息最终一致性分布式事务实战_实现库存微服务

创建库存微服务tx-msg-stock

2345_image_file_copy_451.jpg

引入依赖

   <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starterweb</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connectorjava</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-bootstarter</artifactId>
            <version>2.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-bootstarter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

编写配置文件

server:
 port: 6060
spring:
 application:
   name: tx-msg-stock
 datasource:
   url: jdbc:mysql://192.168.66.100:3306/txmsg-stock?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false
   username: root
   password01: 123456
   driver-class-name: com.mysql.cj.jdbc.Driver
################ RocketMQ 配置 ##########
rocketmq:
 name-server: 192.168.66.100:9876

编写主启动类

package com.itbaizhan.stock;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author itbaizhan
* @version 1.0.0
* @description 库存微服务启动类
*/
@MapperScan("com.itbaizhan.stock.mapper")
@Slf4j
@SpringBootApplication
public class StockServerStarter {
    public static void main(String[] args) {
      SpringApplication.run(StockServerStarter.class, args);
        log.info("**************** 库存服务启动成功 ***********");
   }
}

代码生成

package com.itbaizhan.utils;
import com.baomidou.mybatisplus.generator.FastAutoGenerator;
import com.baomidou.mybatisplus.generator.config.rules.NamingStrategy;
import java.util.Arrays;
import java.util.List;
public class CodeGenerator {
    public static void main(String[] args) {
      FastAutoGenerator.create("jdbc:mysql://192.168.66.102:3306/tx-msg-stock", "root", "123456")
               .globalConfig(builder -> { builder.author("itbaizhan")// 设置作者
                     .commentDate("MMdd") // 注释日期格式
                     .outputDir(System.getProperty("user.dir") +"/rocketmq-msg/stock"+ "/src/main/java/")
                      .fileOverride(); //覆盖文件
               })
                // 包配置
               .packageConfig(builder -> {
                  builder.parent("com.itbaizhan.stock") // 包名前缀
                           .entity("entity")//实体类包名
                           .mapper("mapper")//mapper接口包名
                           .service("service"); //service包名
               })
               .strategyConfig(builder -> {
                    // 设置需要生成的表名
                  builder.addInclude(Arrays.asList("stock","tx_log"))
                            // 开始实体类配置
                           .entityBuilder()
                            // 开启lombok模型
                           .enableLombok() //表名下划线转驼峰
                           .naming(NamingStrategy.underline_to_camel)
                            //列名下划线转驼峰
                           .columnNaming(NamingStrategy.underline_to_camel);
               })
               .execute();
   }
}

编写库存接口

public interface StockService {
    /**
     * 根据id查询库存
     * @param id
     * @return
     */
    Stock getStockById(Long id);
    /**
     * 扣减库存
     */
    void decreaseStock(TxMessage txMessage);
}

可靠消息最终一致性分布式事务实战_库存微服务业务层实现

2345_image_file_copy_452.jpg 

库存微服务的业务逻辑层主要监听RocketMQ发送过来的事务消 息,并在本地事务中执行扣减库存的操作。

编写库存接口

    /**
     * 扣减库存
     */
    void decreaseStock(TxMessage txMessage);

库存接口实现类

package com.itbaizhan.stock.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.itbaizhan.stock.entity.Stock;
import com.itbaizhan.stock.entity.TxLog;
import com.itbaizhan.stock.mapper.StockMapper;
import com.itbaizhan.stock.mapper.TxLogMapper;
import com.itbaizhan.stock.service.IStockService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.itbaizhan.stock.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
/**
* <p>
* 服务实现类
* </p>
*
* @author itbaizhan
* @since 05-20
*/
@Slf4j
@Service
public class StockServiceImpl extends ServiceImpl<StockMapper, Stock> implements
IStockService {
    @Resource
    private StockMapper stockMapper;
    @Resource
    private TxLogMapper txLogMapper;
    @Transactional
    @Override
    public void decreaseStock(TxMessage txMessage) {
        log.info("库存微服务执行本地事务,商品id:{},购买数量:{}", txMessage.getProductId(),
txMessage.getPayCount());
        //检查是否执行过事务
        TxLog txLog = txLogMapper.selectById(txMessage.getTxNo());
        if(txLog != null){
            log.info("库存微服务已经执行过事务,事务编号为:{}", txMessage.getTxNo());
       }
        // 根据商品id查询库存
        QueryWrapper<Stock> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("product_id",txMessage.getProductId());
        Stock stock = stockMapper.selectOne(queryWrapper);
        if(stock.getTotalCount() < txMessage.getPayCount()){
            throw  new RuntimeException("库存不足");
       }
        // 减库存
       stock.setTotalCount(stock.getTotalCount()- txMessage.getPayCount());
        stockMapper.updateById(stock);
        //生成订单
        txLog = new TxLog();
        txLog.setTxNo(txMessage.getTxNo());
        txLog.setCreateTime(LocalDateTime.now());
        //添加事务日志
        txLogMapper.insert(txLog);
   }
}

库存微服务消费者实现

用于消费RocketMQ发送过来的事务消息,并且调用StockService中的decreaseStock(TxMessage)方法扣减库存。

库存事务消费者

package com.itbaizhan.stock.message;
import com.alibaba.fastjson.JSONObject;
import com.itbaizhan.stock.service.IStockService;
import com.itbaizhan.stock.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author binghe
* @version 1.0.0
* @description 库存事务消费者
*/
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "tx_stock_group", topic = "topic_txmsg")
public class StockTxMessageConsumer implements
RocketMQListener<String> {
    @Autowired
    private IStockService stockService;
    @Override
    public void onMessage(String message) {
        log.info("库存微服务开始消费事务消息:{}", message);
        TxMessage txMessage = this.getTxMessage(message);
        stockService.decreaseStock(txMessage);
   }
    private TxMessage getTxMessage(String msg){
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String txStr = jsonObject.getString("txMessage");
        return JSONObject.parseObject(txStr,TxMessage.class);
   }
}

可靠消息最终一致性分布式事务实战_测试程序

查询数据

正式测试之前,先来查询下tx-msg-orders数据库和tx-msg-stock数 据库各个数据表中的数据。

2345_image_file_copy_453.jpg

分别启动库存和订单微服务

编写控制层接口

@Autowired
    private IOrderService iOrderService;
    /**
     * 创建订单
     * @param productId 商品id
     * @param payCount 购买数量
     * @return
     */
    @GetMapping(value = "/submit_order")
    public String transfer(@RequestParam("productId")Long productId, @RequestParam("payCount") Integer payCount){ 
        iOrderService.submitOrder(productId, payCount);
        return "下单成功";
   }

分别启动库存微服务stock和订单微服务orders,并在浏览器中访问 http://localhost:9090/order/submit_order?productId=1001&pay Count=1

2345_image_file_copy_454.jpg

最终一致性分布式事务解决方案_什么是最大努力通知型分布式事务

2345_image_file_copy_455.jpg

最大努力通知型( Best-effort delivery)是最简单的一种柔性事务。

适用场景

最大努力通知型解决方案适用于最终一致性时间敏感度低的场景。 最典型的使用场景就是支付成功后,支付平台异步通知商户支付结 果。并且事务被动方的处理结果不会影响主动方的处理结果。 典型的使用场景:如银行通知、商户通知等。

流程图

2345_image_file_copy_456.jpg

2345_image_file_copy_457.jpg

2345_image_file_copy_458.jpg

最大努力通知型分布式事务_最大努力通知与可靠消息最终一致性的区别

2345_image_file_copy_459.jpg

2345_image_file_copy_460.jpg

最大努力通知型分布式事务解决方案

2345_image_file_copy_461.jpg

2345_image_file_copy_462.jpg

2345_image_file_copy_463.jpg

流程:

1、发起通知方将通知发给MQ。 使用普通消息机制将通知发给MQ。

2、接收通知方监听 MQ。

3、接收通知方接收消息,业务处理完成回应ack。

4、接收通知方若没有回应ack则MQ会重复通知。 MQ会按照间隔1min、5min、10min、 30min、1h、2h、5h、10h的方式,逐步拉大通知间隔(如果MQ采用 rocketMq,在 broker中可进行配置),直到达到通知要求的时间窗口上限。

5、接收通知方可通过消息校对接口来校对消息的一致性。

2345_image_file_copy_464.jpg

2345_image_file_copy_465.jpg

最大努力通知型分布式事务_案例业务说明

2345_image_file_copy_466.jpg

2345_image_file_copy_467.jpg

2345_image_file_copy_468.jpg

2345_image_file_copy_469.jpg

设计完数据库后,创建tx-notifymsg-account库

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for account_info
-- ----------------------------
DROP TABLE IF EXISTS `account_info`;
CREATE TABLE `account_info`  (
  `id` int(11) NOT NULL COMMENT '主键id',
  `account_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '账户',
`account_name` varchar(255) CHARACTER SET
utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'账户名',
  `account_balance` decimal(10, 2) NULL DEFAULT
NULL COMMENT '账户余额',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of account_info
-- ----------------------------
-- ----------------------------
-- Table structure for pay_info
-- ----------------------------
DROP TABLE IF EXISTS `pay_info`;
CREATE TABLE `pay_info`  (
  `tx_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '充值记录流水号',
  `account_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账
户',
  `pay_amount` decimal(10, 2) CHARACTER SET
utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'充值金额',
  `pay_result` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值
结果',
  `pay_time` datetime(0) NOT NULL COMMENT '充值
时间',
PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE
= utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of pay_info
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;

设计完数据库后,创建tx-notifymsg-payment库

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for pay_info
-- ----------------------------
DROP TABLE IF EXISTS `pay_info`;
CREATE TABLE `pay_info`  (
  `tx_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '充值记录流水
号',
  `account_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账
户',
  `pay_amount` decimal(10, 2) CHARACTER SET
utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'充值金额',
`pay_result` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值
结果',
  `pay_time` datetime(0) NOT NULL COMMENT '充值
时间',
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE
= utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of pay_info
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;

最大努力通知型分布式事务实战_实现充值微服务

2345_image_file_copy_470.jpg

主要实现功能

1、充值接口

2、查询充值结果接口

创建父项目rocketmq-notifymsg

2345_image_file_copy_471.jpg

创建子工程

2345_image_file_copy_472.jpg

引入依赖

     <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starterweb</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-bootstarter</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connectorjava</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-bootstarter</artifactId>
            <version>2.0.1</version>
        </dependency>
        <!-- 引入nacos依赖 -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starteralibaba-nacos-discovery</artifactId>
        </dependency>
    </dependencies>

编写主启动类

@EnableDiscoveryClient
@MapperScan("com.itbaizhan.payment.mapper")
@SpringBootApplication
@Slf4j
public class PayMain7071 {
    public static void main(String[] args) {
       SpringApplication.run(PayMain7071.class,args);
        log.info("*********** 充值服务启动成功*********");
   }
}

编写配置文件

server:
 port: 7071
spring:
 cloud:
   nacos:
     discovery:
       server-addr: 192.168.66.100:8848
 application:
   name: tx-notifymsg-pay
 datasource:
   url: jdbc:mysql://192.168.66.100:3306/txnotifymsg-payment?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false
   username: root
   password01: 123456
   driver-class-name: com.mysql.cj.jdbc.Driver
################ RocketMQ 配置 ##########
rocketmq:
 name-server: 192.168.66.100:9876
 producer:
   group: payment-group

最大努力通知型分布式事务_充值微服务之业务层实现

2345_image_file_copy_473.jpg

充值微服务的业务逻辑层主要完成充值的业务逻辑处理,当充值成功时,会向RocketMQ发送充值结果信息,同时提供业务逻辑层查询充值结果信息的接口。

编写充值接口

public interface IPayInfoService extends IService<PayInfo> {
    /**
     * 保存充值信息
     */
    PayInfo savePayInfo(PayInfo payInfo);
    /**
     * 查询指定的充值信息
     */
    PayInfo getPayInfoByTxNo(String txNo);
}

充值接口实现

@Slf4j
@Service
public class PayInfoServiceImpl extends
ServiceImpl<PayInfoMapper, PayInfo> implements IPayInfoService {
    @Resource
    private PayInfoMapper payInfoMapper;
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    @Override
    public PayInfo savePayInfo(PayInfo payInfo)
{
      payInfo.setTxNo(UUID.randomUUID().toString().replace("-",""));
        payInfo.setPayResult("success");
      payInfo.setPayTime(LocalDateTime.now());
        int count = payInfoMapper.insert(payInfo);
        //充值信息保存成功
        if(count > 0){
            log.info("充值微服务向账户微服务发送结果消息");
            //发送消息通知账户微服务
            rocketMQTemplate.convertAndSend("topic_nofitymsg",JSON.toJSONString(payInfo));
            return payInfo;
       }
        return null;
   }
    @Override
    public PayInfo getPayInfoByTxNo(String txNo) {
        return baseMapper.selectById(txNo);
   }
}

编写充值接口

@RestController
@RequestMapping("/payInfo")
public class PayInfoController {
    @Autowired
    private IPayInfoService payInfoService;
    /**
     * 充值
     * @param payInfo
     * @return
     */
    @GetMapping(value = "/pay_account")
    public PayInfo pay(PayInfo payInfo){
        //生成事务编号
        return payInfoService.savePayInfo(payInfo);
   }
    /**
     * 查询充值结果
     * @param txNo
     * @return
     */
    @GetMapping(value = "/query/payresult/{txNo}")
    public PayInfo payResult(@PathVariable("txNo") String txNo){
        return payInfoService.getPayInfoByTxNo(txNo);
   }
}

最大努力通知型分布式事务_实现账户微服务

创建子工程account

2345_image_file_copy_474.jpg

引入依赖

     <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starterweb</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-bootstarter</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connectorjava</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-bootstarter</artifactId>
            <version>2.0.1</version>
        </dependency>
        <!--   引入Nacos注册中心   -->
        <dependency>
           <groupId>com.alibaba.cloud</groupId>
           <artifactId>spring-cloud-starteralibaba-nacos-discovery</artifactId>
        </dependency>
        <!--   引入OpenFeign -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starteropenfeign</artifactId>
        </dependency>
        <!--   引入负载均衡器-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloudloadbalancer</artifactId>
        </dependency>
    </dependencies>

编写配置文件

server:
 port: 7070
spring:
 cloud:
   nacos:
     discovery:
       server-addr: 192.168.66.100:8848
 application:
   name: tx-notifymsg-account
 datasource:
   url: jdbc:mysql://192.168.66.100:3306/txnotifymsg-account?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false
   username: root
   password01: 123456
   driver-class-name: com.mysql.jdbc.Driver
################ RocketMQ 配置 ##########
rocketmq:
 name-server: 192.168.66.100:9876

最大努力通知型分布式事务_账户微服务之业务层实现

2345_image_file_copy_475.jpg

RocketMQ消费充值信息

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "consumer_group_account", topic = "topic_nofitymsg")
public class NotifyMsgAccountListener implements RocketMQListener<String> {
    @Autowired
    private IAccountInfoService accountInfoService;
    @Override
    public void onMessage(String message) {
        log.info("账户微服务收到RocketMQ的消息: {}", JSONObject.toJSONString(message));
        //如果是充值成功,则修改账户余额
        PayInfo payInfo = JSON.parseObject(message, PayInfo.class);
      if("success".equals(payInfo.getPayResult())){
           accountInfoService.updateAccountBalance(payInfo);
       }
        log.info("更新账户余额完毕:{}", JSONObject.toJSONString(payInfo));
   }
}

编写账户操作接口

    /**
     * 更新账户余额
     */
    void updateAccountBalance(PayInfo payInfo);

实现账户操作接口

@Slf4j
@Service
public class AccountInfoServiceImpl extends
ServiceImpl<AccountInfoMapper, AccountInfo> implements IAccountInfoService {
    @Resource
    private AccountInfoMapper accountInfoMapper;
    @Resource
    private PayInfoMapper payInfoMapper;
    /**
     *
     * @param payInfo
     */
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void updateAccountBalance(PayInfo payInfo) {
        if(payInfoMapper.selectById(payInfo.getTxNo()) != null){
            log.info("账户微服务已经处理过当前事务...");
            return;
       }
        LambdaUpdateWrapper<AccountInfo> lambdaUpdateWrapper = new LambdaUpdateWrapper<>();
        lambdaUpdateWrapper.eq(AccountInfo::getAccountNo,payInfo.getAccountNo());
        //更新账户余额
        List<AccountInfo> accountInfos = baseMapper.selectList(lambdaUpdateWrapper);
        if (accountInfos != null && !accountInfos.isEmpty()){
            AccountInfo accountInfo = accountInfos.get(0);
            accountInfo.setAccountBalance(accountInfo.getAccountBalance().add(payInfo.getPayAmount()));
            accountInfoMapper.updateById(accountInfo);
       }
        //保存充值记录
        payInfoMapper.insert(payInfo);
   }
}

最大努力通知型分布式事务_账户微服务远程调用实现

2345_image_file_copy_476.jpg

主启动类加Feign注解

@EnableDiscoveryClient
@EnableFeignClients
@MapperScan("com.itbaizhan.account.mapper")
@SpringBootApplication
@Slf4j
public class AccountMain7070 {
    public static void main(String[] args) {
       SpringApplication.run(AccountMain7070.class,args);
        log.info("*********** AccountMain7070启动成功 *********");
   }
}

编写远程调用接口

@Service
@FeignClient("tx-notifymsg-pay")
public interface IPayFeignService {
    @GetMapping(value = "/payInfo/query/payresult/{txNo}")
    PayInfo payResult(@PathVariable("txNo") String txNo);
}

编写查询账户接口

    /**
     * 查询充值结果
     */
    PayInfo queryPayResult(String txNo);

实现查询账户信息

    /**
     * 查询结果
     * @param txNo
     * @return
     */
    @Override
    public PayInfo queryPayResult(String txNo)
      {
        try{
            return iPayFeignService.payResult(txNo);
       }catch (Exception e){
            log.error("查询充值结果异常:{}", e);
       }
        return null;
   }

编写查询充值结果接口

    /**
     * 主动查询充值结果
     * @param txNo
     * @return
     */
  @GetMapping(value = "/query/payresult/{txNo}")
    public ResponseEntity result(@PathVariable("txNo") String txNo){
        return ResponseEntity.ok(accountInfoService.queryPayResult(txNo));
   }

最大努力通知型分布式事务_测试程序

查看account库和payment库数据

2345_image_file_copy_477.jpg

启动账户和充值微服务

调用充值微服务的接口http://localhost:7071/payInfo/pay_accoun t为账户编号为1001的账户充值1000元。

2345_image_file_copy_478.jpg

账户微服务的日志文件中输出如下信息

2345_image_file_copy_479.jpg

可以看到,充值微服务将充值结果信息成功发送到了RocketMQ, 并且账户微服务成功订阅了RocketMQ的消息并执行了本地事务。

查询充值结果

2345_image_file_copy_480.jpg

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
5月前
|
存储 SQL 分布式数据库
OceanBase 入门:分布式数据库的基础概念
【8月更文第31天】在当今的大数据时代,随着业务规模的不断扩大,传统的单机数据库已经难以满足高并发、大数据量的应用需求。分布式数据库应运而生,成为解决这一问题的有效方案之一。本文将介绍一款由阿里巴巴集团自主研发的分布式数据库——OceanBase,并通过一些基础概念和实际代码示例来帮助读者理解其工作原理。
516 0
|
7月前
|
机器学习/深度学习 分布式计算 算法
联邦学习是保障数据隐私的分布式机器学习方法
【6月更文挑战第13天】联邦学习是保障数据隐私的分布式机器学习方法,它在不暴露数据的情况下,通过在各设备上本地训练并由中心服务器协调,实现全局模型构建。联邦学习的优势在于保护隐私、提高训练效率和增强模型泛化。已应用于医疗、金融和物联网等领域。未来趋势包括更高效的数据隐私保护、提升可解释性和可靠性,以及与其他技术融合,有望在更多场景发挥潜力,推动机器学习发展。
142 4
|
7月前
|
消息中间件 NoSQL Java
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
272 0
|
3月前
|
消息中间件 关系型数据库 Java
‘分布式事务‘ 圣经:从入门到精通,架构师尼恩最新、最全详解 (50+图文4万字全面总结 )
本文 是 基于尼恩之前写的一篇 分布式事务的文章 升级而来 , 尼恩之前写的 分布式事务的文章, 在全网阅读量 100万次以上 , 被很多培训机构 作为 顶级教程。 此文修改了 老版本的 一个大bug , 大家不要再看老版本啦。
|
4月前
|
Dubbo Java 应用服务中间件
分布式-dubbo的入门
分布式-dubbo的入门
|
4月前
|
机器学习/深度学习 算法 自动驾驶
深度学习之分布式智能体学习
基于深度学习的分布式智能体学习是一种针对多智能体系统的机器学习方法,旨在通过多个智能体协作、分布式决策和学习来解决复杂任务。这种方法特别适用于具有大规模数据、分散计算资源、或需要智能体彼此交互的应用场景。
244 4
|
5月前
|
机器学习/深度学习 并行计算 PyTorch
PyTorch与DistributedDataParallel:分布式训练入门指南
【8月更文第27天】随着深度学习模型变得越来越复杂,单一GPU已经无法满足训练大规模模型的需求。分布式训练成为了加速模型训练的关键技术之一。PyTorch 提供了多种工具来支持分布式训练,其中 DistributedDataParallel (DDP) 是一个非常受欢迎且易用的选择。本文将详细介绍如何使用 PyTorch 的 DDP 模块来进行分布式训练,并通过一个简单的示例来演示其使用方法。
672 2
|
8月前
|
并行计算 算法 物联网
LLM 大模型学习必知必会系列(七):掌握分布式训练与LoRA/LISA微调:打造高性能大模型的秘诀进阶实战指南
LLM 大模型学习必知必会系列(七):掌握分布式训练与LoRA/LISA微调:打造高性能大模型的秘诀进阶实战指南
LLM 大模型学习必知必会系列(七):掌握分布式训练与LoRA/LISA微调:打造高性能大模型的秘诀进阶实战指南
|
7月前
|
存储 搜索推荐 Java
微服务SpringCloud ES分布式全文搜索引擎简介 下载安装及简单操作入门
微服务SpringCloud ES分布式全文搜索引擎简介 下载安装及简单操作入门
99 2
|
7月前
|
负载均衡 NoSQL 关系型数据库
Redis分布式锁学习总结
Redis分布式锁学习总结
42 0

热门文章

最新文章