分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)(下)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)

分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)(上):https://developer.aliyun.com/article/1419990


编写配置文件

server:
 port: 9090
spring:
 application:
   name: tx-msg-stock
 datasource:
   url: jdbc:mysql://192.168.66.100:3306/txmsg-order?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false
   username: root
   password: 123456
   driver-class-name: com.mysql.cj.jdbc.Driver
################ RocketMQ 配置 ##########
rocketmq:
 name-server: 192.168.66.100:9876
 producer:
   group: order-group


编写主启动类

/**
 * 订单微服务启动成功
 */
@Slf4j
@MapperScan("com.tong.order.mapper")
@SpringBootApplication
public class OrderMain9090 {
    public static void main(String[] args) {
      SpringApplication.run(OrderMain9090.class,args);
        log.info("************* 订单微服务启动成功*******");
   }
}


代码生成

package com.tong.utils;
import com.baomidou.mybatisplus.generator.FastAutoGenerator;
import com.baomidou.mybatisplus.generator.config.rules.NamingStrategy;
import java.util.Arrays;
import java.util.List;
public class CodeGenerator {
    public static void main(String[] args) {
      FastAutoGenerator.create("jdbc:mysql://192.168.66.102:3306/tx-msg-order", "root", "123456")
               .globalConfig(builder -> {
                    builder.author("tong")// 设置作者
                           .commentDate("MMdd") // 注释日期格式
                           .outputDir(System.getProperty("user.dir")+"/rocketmq-msg/orders"+ "/src/main/java/")
                           .fileOverride(); //覆盖文件
               })
                // 包配置
               .packageConfig(builder -> {
                  builder.parent("com.tong.orders") // 包名前缀
                           .entity("entity")//实体类包名
                           .mapper("mapper")//mapper接口包名
                           .service("service"); //service包名
               })
              .strategyConfig(builder -> {
                    // 设置需要生成的表名
                  builder.addInclude(Arrays.asList("orders","tx_log"))
                            // 开始实体类配置
                           .entityBuilder()
                            // 开启lombok模型
                           .enableLombok()
                            //表名下划线转驼峰
                           .naming(NamingStrategy.underline_to_camel)
                            //列名下划线转驼峰
                           .columnNaming(NamingStrategy.underline_to_camel);
               })
               .execute();
   }
}


创建TxMessage类


在项目的com.itbaizhan.orders.tx包下创建TxMessage类,主要用 来封装实现分布式事务时,在订单微服务、RocketMQ消息中间件 和库存微服务之间传递的全局事务消息,项目中会通过事务消息实现幂等。

@Data
@NoArgsConstructor
@AllArgsConstructor
public class TxMessage implements Serializable
{
    private static final long serialVersionUID = -4704980150056885074L;
    /**
     * 商品id
     */
    private Long productId;
    /**
     * 商品购买数量
     */
    private Integer payCount;
    /**
     * 全局事务编号
     */
    private String txNo;
}


可靠消息最终一致性分布式事务实战_订单微服务业务层实现



业务逻辑层主要实现了用户提交订单后的业务逻辑。


编写OrderService接口

    /**
     * 添加订单
     * @param productId 商品id
     * @param payCount 购买数量
     */
    void save(Long productId,Integer payCount);
    /**
     * 提交订单同时保存事务信息
     */
    void submitOrderAndSaveTxNo(TxMessage txMessage);
    /**
     * 提交订单
     * @param productId 商品id
     * @param payCount 购买数量
     */
    void submitOrder(Long productId, Integer payCount);


编写OrderService接口实现

package com.itbaizhan.order.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.tong.order.entity.Order;
import com.tong.order.entity.TxLog;
import com.tong.order.mapper.OrderMapper;
import com.tong.order.mapper.TxLogMapper;
import com.tong.order.service.IOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.tong.order.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.UUID;
/**
* <p>
* 服务实现类
* </p>
*
* @author tong
* @since 05-20
*/
@Slf4j
@Service
public class OrderServiceImpl extends
ServiceImpl<OrderMapper, Order> implements IOrderService {
    @Resource
    RocketMQTemplate rocketMQTemplate;
    @Resource
  private TxLogMapper txLogMapper;
    /**
     * 添加
     * @param productId 商品id
     * @param payCount 购买数量
     */
    @Override
    public void save(Long productId, Integer payCount) {
        Order order = new Order();
        // 订单创建时间
        order.setCreateTime(LocalDateTime.now());
        // 生产订单编号
        order.setOrderNo(UUID.randomUUID().toString().replace("-",""));
        // 商品id
        order.setProductId(productId);
        // 购买数量
        order.setPayCount(payCount);
        baseMapper.insert(order);
   }
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void submitOrderAndSaveTxNo(TxMessage txMessage) {
        TxLog txLog = txLogMapper.selectById(txMessage.getTxNo());
    if(txLog != null){
            log.info("订单微服务已经执行过事务,商品id为:{},事务编号为:{}",txMessage.getProductId(),txMessage.getTxNo());
            return;
       }
        //生成订单
      this.save(txMessage.getProductId(),txMessage.getPayCount());
        //生成订单
        txLog = new TxLog();
        txLog.setTxNo(txMessage.getTxNo());
        txLog.setCreateTime(LocalDateTime.now());
        //添加事务日志
        txLogMapper.insert(txLog);
   }
    /**
     * 提交订单
     * @param productId 商品id
     * @param payCount 购买数量
     */
    @Override
    public void submitOrder(Long productId,Integer payCount) {
        //生成全局分布式序列号
        String txNo = UUID.randomUUID().toString();
        TxMessage txMessage = new TxMessage(productId, payCount, txNo);
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("txMessage", txMessage);
        Message<String> message = MessageBuilder.withPayload(jsonObject.toJSONString()).build();
        //发送事务消息   且该消息不允许消费   
        tx_order_group: 指定版事务消息组
      rocketMQTemplate.sendMessageInTransaction("tx_order_group", "topic_txmsg", message, null);
   }
}


可靠消息最终一致性分布式事务实战_订单微服务监听事务消息


执行本地的业务代码

package com.tong.order.message;
import com.alibaba.fastjson.JSONObject;
import com.tong.order.entity.TxLog;
import com.tong.order.mapper.TxLogMapper;
import com.tong.order.service.IOrderService;
import com.tong.order.service.ITxLogService;
import com.tong.order.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
* @author tong
* @version 1.0.0
* @description 监听事务消息
*/
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "tx_order_group")
public class OrderTxMessageListener implements
RocketMQLocalTransactionListener {
    @Autowired
    private IOrderService orderService;
    @Resource
    private TxLogMapper txLogMapper;
    /**
     * RocketMQ的Producer本地事务:先执行本地的业务代码(使用Spring的事件管理),判断是否成功。
     * 成功返回: RocketMQLocalTransactionState.COMMIT,
     * 失败返回:RocketMQLocalTransactionState.ROLLBACK
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object obj) 
      {
        try {
            log.info("订单微服务执行本地事务");
            TxMessage txMessage = this.getTxMessage(msg);
            //执行本地事务
            orderService.submitOrderAndSaveTxNo(txMessage);
            //提交事务
            log.info("订单微服务提交事务");
            // COMMIT:即生产者通知Rocket该消息可以消费
            return RocketMQLocalTransactionState.COMMIT;
       } catch (Exception e) {
            e.printStackTrace();
            //异常回滚事务
            log.info("订单微服务回滚事务");
            // ROLLBACK:即生产者通知Rocket将该消息删除
            return RocketMQLocalTransactionState.ROLLBACK;
       }
   }
    private TxMessage getTxMessage(Message msg)
     {
        String messageString = new String((byte[]) msg.getPayload());
        JSONObject jsonObject = JSONObject.parseObject(messageString);
        String txStr = jsonObject.getString("txMessage");
        return JSONObject.parseObject(txStr,TxMessage.class);
   }
}


网络异常消息处理

    /**
     * 因为网络异常或其他原因时,RocketMQ的消息状态处于UNKNOWN时,调用该方法检查Producer的本地 
     * 事务是否已经执行成功,
     * 成功返回: RocketMQLocalTransactionState.COMMIT,
     * 失败返回:RocketMQLocalTransactionState.ROLLBACK
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info("订单微服务查询本地事务");
        TxMessage txMessage = this.getTxMessage(msg);
        // 获取订单的消息
        Integer exists = txLogService.isExistsTx(txMessage.getTxNo());
        if (exists != null) {
            // COMMIT:即生产者通知Rocket该消息可以消费
            return RocketMQLocalTransactionState.COMMIT;
       }
        // UNKNOWN:即生产者通知Rocket继续查询该消息的状态
        return RocketMQLocalTransactionState.UNKNOWN;
}
   private TxMessage getTxMessage(Message msg)
     {
        String messageString = new String((byte[]) msg.getPayload());
        JSONObject jsonObject = JSONObject.parseObject(messageString);
        String txStr = jsonObject.getString("txMessage");
        return JSONObject.parseObject(txStr,TxMessage.class);
     }


可靠消息最终一致性分布式事务实战_实现库存微服务


创建库存微服务tx-msg-stock


引入依赖

   <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starterweb</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connectorjava</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-bootstarter</artifactId>
            <version>2.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-bootstarter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>


编写配置文件

server:
 port: 6060
spring:
 application:
   name: tx-msg-stock
 datasource:
   url: jdbc:mysql://192.168.66.100:3306/txmsg-stock?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false
   username: root
   password01: 123456
   driver-class-name: com.mysql.cj.jdbc.Driver
################ RocketMQ 配置 ##########
rocketmq:
 name-server: 192.168.66.100:9876


编写主启动类

package com.tong.stock;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author tong
* @version 1.0.0
* @description 库存微服务启动类
*/
@MapperScan("com.tong.stock.mapper")
@Slf4j
@SpringBootApplication
public class StockServerStarter {
    public static void main(String[] args) {
      SpringApplication.run(StockServerStarter.class, args);
        log.info("**************** 库存服务启动成功 ***********");
   }
}


代码生成

package com.tong.utils;
import com.baomidou.mybatisplus.generator.FastAutoGenerator;
import com.baomidou.mybatisplus.generator.config.rules.NamingStrategy;
import java.util.Arrays;
import java.util.List;
public class CodeGenerator {
    public static void main(String[] args) {
      FastAutoGenerator.create("jdbc:mysql://192.168.66.102:3306/tx-msg-stock", "root", "123456")
               .globalConfig(builder -> { builder.author("tong")// 设置作者
                     .commentDate("MMdd") // 注释日期格式
                     .outputDir(System.getProperty("user.dir") +"/rocketmq-msg/stock"+ "/src/main/java/")
                      .fileOverride(); //覆盖文件
               })
                // 包配置
               .packageConfig(builder -> {
                  builder.parent("com.tong.stock") // 包名前缀
                           .entity("entity")//实体类包名
                           .mapper("mapper")//mapper接口包名
                           .service("service"); //service包名
               })
               .strategyConfig(builder -> {
                    // 设置需要生成的表名
                  builder.addInclude(Arrays.asList("stock","tx_log"))
                            // 开始实体类配置
                           .entityBuilder()
                            // 开启lombok模型
                           .enableLombok() //表名下划线转驼峰
                           .naming(NamingStrategy.underline_to_camel)
                            //列名下划线转驼峰
                           .columnNaming(NamingStrategy.underline_to_camel);
               })
               .execute();
   }
}


编写库存接口

public interface StockService {
    /**
     * 根据id查询库存
     * @param id
     * @return
     */
    Stock getStockById(Long id);
    /**
     * 扣减库存
     */
    void decreaseStock(TxMessage txMessage);
}


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
Kubernetes Cloud Native 开发者
云原生入门:从容器到微服务
本文将带你走进云原生的世界,从容器技术开始,逐步深入到微服务架构。我们将通过实际代码示例,展示如何利用云原生技术构建和部署应用。无论你是初学者还是有经验的开发者,这篇文章都将为你提供有价值的信息和启示。
|
2月前
|
Cloud Native 持续交付 云计算
云原生入门指南:从容器到微服务
【10月更文挑战第28天】在数字化转型的浪潮中,云原生技术成为推动现代软件开发的关键力量。本篇文章将带你了解云原生的基本概念,探索它如何通过容器化、微服务架构以及持续集成和持续部署(CI/CD)的实践来提升应用的可伸缩性、灵活性和可靠性。你将学习到如何利用这些技术构建和部署在云端高效运行的应用,并理解它们对DevOps文化的贡献。
63 2
|
2月前
|
Kubernetes 关系型数据库 MySQL
Kubernetes入门:搭建高可用微服务架构
【10月更文挑战第25天】在快速发展的云计算时代,微服务架构因其灵活性和可扩展性备受青睐。本文通过一个案例分析,展示了如何使用Kubernetes将传统Java Web应用迁移到Kubernetes平台并改造成微服务架构。通过定义Kubernetes服务、创建MySQL的Deployment/RC、改造Web应用以及部署Web应用,最终实现了高可用的微服务架构。Kubernetes不仅提供了服务发现和负载均衡的能力,还通过各种资源管理工具,提升了系统的可扩展性和容错性。
144 3
|
3月前
|
Dubbo Java 应用服务中间件
Dubbo学习圣经:从入门到精通 Dubbo3.0 + SpringCloud Alibaba 微服务基础框架
尼恩团队的15大技术圣经,旨在帮助开发者系统化、体系化地掌握核心技术,提升技术实力,从而在面试和工作中脱颖而出。本文介绍了如何使用Dubbo3.0与Spring Cloud Gateway进行整合,解决传统Dubbo架构缺乏HTTP入口的问题,实现高性能的微服务网关。
|
2月前
|
监控 API 持续交付
后端开发中的微服务架构:从入门到精通
【10月更文挑战第26天】 在当今的软件开发领域,微服务架构已经成为了众多企业和开发者的首选。本文将深入探讨微服务架构的核心概念、优势以及实施过程中可能遇到的挑战。我们将从基础开始,逐步深入了解如何构建、部署和管理微服务。无论你是初学者还是有经验的开发者,这篇文章都将为你提供有价值的见解和实用的建议。
47 0
|
5月前
|
Cloud Native 云计算 微服务
云原生入门指南:从零开始构建微服务
【8月更文挑战第31天】在数字化浪潮中,云原生技术正引领着软件开发的未来。本文旨在为初学者揭开云原生的神秘面纱,通过一个简易微服务的搭建过程,展示云原生应用的构建和部署。我们将从概念理解到实际操作,一步步带领读者走进云原生的世界,探索其背后的哲学与实践之美。
|
5月前
|
Kubernetes Cloud Native Docker
云原生入门:从容器化到微服务
【8月更文挑战第31天】在数字化浪潮中,云原生技术成为企业转型的核心驱动力。本文将带领读者从零开始,探索如何利用云原生技术构建现代、高效的应用架构。我们将一起学习容器化的基础,深入理解Docker和Kubernetes的工作原理,并实践构建一个简单的微服务应用。通过代码示例和实操演练,让理论与实践相结合,为读者揭开云原生技术的神秘面纱。
|
2月前
|
设计模式 Java API
微服务架构演变与架构设计深度解析
【11月更文挑战第14天】在当今的IT行业中,微服务架构已经成为构建大型、复杂系统的重要范式。本文将从微服务架构的背景、业务场景、功能点、底层原理、实战、设计模式等多个方面进行深度解析,并结合京东电商的案例,探讨微服务架构在实际应用中的实施与效果。
148 6
|
2月前
|
设计模式 Java API
微服务架构演变与架构设计深度解析
【11月更文挑战第14天】在当今的IT行业中,微服务架构已经成为构建大型、复杂系统的重要范式。本文将从微服务架构的背景、业务场景、功能点、底层原理、实战、设计模式等多个方面进行深度解析,并结合京东电商的案例,探讨微服务架构在实际应用中的实施与效果。
56 1
|
1月前
|
Java 开发者 微服务
从单体到微服务:如何借助 Spring Cloud 实现架构转型
**Spring Cloud** 是一套基于 Spring 框架的**微服务架构解决方案**,它提供了一系列的工具和组件,帮助开发者快速构建分布式系统,尤其是微服务架构。
161 69
从单体到微服务:如何借助 Spring Cloud 实现架构转型