Springboot+RocketMQ通过事务消息优雅的实现订单支付功能

简介: RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账, A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银 行账户扣除一万元”这个操作同时成功或者同时失败。RocketMQ采用两阶段提交的方式实现事务消息。

1. 事务消息


RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账, A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银 行账户扣除一万元”这个操作同时成功或者同时失败。RocketMQ采用两阶段提交的方式实现事务消息。


1.1 RocketMQ事务消息的原理


半事务消息发送:生产者将半事务消息发送至RocketMQ服务端。


消息持久化及返回Ack确认:RocketMQ服务端接收到半事务消息并持久化成功后,向生产者返回Ack确认消息已经发送成功。此时消息状态为半事务消息。


执行本地事务逻辑:根据发送结果执行本地事务,如果写入失败,此时half消息对业务不可见,本地事务逻辑不执行。


提交二次确认结果:根据本地事务状态执行Commit或者Rollback。RocketMQ 的事务消息分为3种状态,分别是提交状态、回滚状态、未知状态。

TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。

TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。

TransactionStatus.Unknown: 未知状态,它代表需要检查消息队列来确定状态。


消息回查:(1) 对没有Commit/Rollback的事务消息,从服务端发起一次回查 (2) Producer收到回查消息,检查回查消息对应的本地事务的状态 (3) 根据本地事务状态,重新Commit或者Rollback。第一次回查后仍未获取到事务状态,则之后每隔30s会再次回查,最多重试15次,超过了就会默认丢弃此消息。


1.2 RocketMQ订单支付功能设计


数据库设计


/*
SQLyog Community v13.2.0 (64 bit)
MySQL - 8.0.33 : Database - shop
*********************************************************************
*/
/*!40101 SET NAMES utf8 */;
/*!40101 SET SQL_MODE=''*/;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`shop` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;
USE `shop`;
/*Table structure for table `shop_order` */
DROP TABLE IF EXISTS `shop_order`;
CREATE TABLE `shop_order` (
  `id` VARCHAR(50) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin NOT NULL COMMENT '订单id',
  `total_num` INT DEFAULT NULL COMMENT '数量合计',
  `moneys` INT DEFAULT NULL COMMENT '金额合计',
  `pay_type` VARCHAR(1) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '支付类型,1、在线支付、0 货到付款',
  `create_time` DATETIME DEFAULT NULL COMMENT '订单创建时间',
  `update_time` DATETIME DEFAULT NULL COMMENT '订单更新时间',
  `pay_time` DATETIME DEFAULT NULL COMMENT '付款时间',
  `consign_time` DATETIME DEFAULT NULL COMMENT '发货时间',
  `end_time` DATETIME DEFAULT NULL COMMENT '交易完成时间',
  `username` VARCHAR(50) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '用户名称',
  `recipients` VARCHAR(50) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '收货人',
  `recipients_mobile` VARCHAR(12) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '收货人手机',
  `recipients_address` VARCHAR(200) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '收货人地址',
  `weixin_transaction_id` VARCHAR(30) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '交易流水号',
  `order_status` INT DEFAULT NULL COMMENT '订单状态,0:未完成,1:已完成,2:已退货',
  `pay_status` INT DEFAULT NULL COMMENT '支付状态,0:未支付,1:已支付,2:支付失败',
  `is_delete` INT DEFAULT NULL COMMENT '是否删除',
  PRIMARY KEY (`id`),
  KEY `create_time` (`create_time`),
  KEY `status` (`order_status`),
  KEY `payment_type` (`pay_type`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb3 COLLATE=utf8mb3_bin;
/*Data for the table `shop_order` */
/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;


添加RocketMQ依赖


<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>


bootstrap.yaml配置

server:
  port: 8085
spring:
  application:
    name: mall-order
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/shop?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
    username: root
    password: 123456
  cloud:
    nacos:
      config:
        file-extension: yaml
        server-addr: localhost:8848
      discovery:
        #Nacos的注册地址
        server-addr: localhost:8848
rocketmq:
  name-server: localhost:9876
  producer:
    group: test-group-producer


Service层

public interface OrderService extends IService<Order> {
       //添加订单
       void add(Order order);
       //修改订单支付状态
       void pay(String id);
}
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order>
    implements OrderService{
    @Autowired
    OrderMapper orderMapper;
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void add(Order order) {
        order.setCreateTime(new Date());
        orderMapper.insert(order); //这里仅仅生成订单,还有扣减库存等等一系列操作省略
    }
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void pay(String id) {
        //模拟支付完成,修改订单的支付状态
        Order order = orderMapper.selectById(id);
        order.setPayStatus(1);
        order.setPayTime(new Date());
        orderMapper.updateById(order);
    }
}


创建生产者


@RestController
@Slf4j
public class TestController {
    @Autowired
    OrderMapper orderMapper;
    @Autowired
    RocketMQTemplate rocketMQTemplate;
    @RequestMapping("/send")
    public String send(){
        String id = UUID.randomUUID().toString();
        String msg = "订单"+id+"支付成功";
        Order order=new Order();
        order.setId(id);
        order.setCreateTime(new Date());
        order.setMoneys(100);
        order.setUsername("张三");
        Message<String> message = MessageBuilder.withPayload(msg).setHeader("key",id).build();
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("order", message, order);
        String transactionId = result.getTransactionId();
        String status = result.getSendStatus().name();
        log.info("发送消息成功 transactionId={} status={} ",transactionId,status);
        return "success";
    }
}


创建消费者


@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "test-consumer",topic = "order",messageModel = MessageModel.CLUSTERING)
public class RocketMQListen implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
        System.out.println(body);
    }
}


生产者消息监听器

@Component
@RocketMQTransactionListener
public class TransactionMsgListener implements RocketMQLocalTransactionListener {
    @Autowired
    OrderService orderService;
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        Order order = (Order) o;
        try {
            //生成订单
            orderService.add(order);
            return RocketMQLocalTransactionState.UNKNOWN;
        }catch (Throwable throwable){
            throwable.printStackTrace();
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        String key = message.getHeaders().get("key").toString();
        System.out.println("回查订单id "+key+" 回查时间"+new Date());
        Order order = orderService.getById(key);
        if(order!=null) {
            long l = new Date().getTime() - order.getCreateTime().getTime();
            long time = l / (1000 * 60);
            //超时1分钟后,就会把未支付的订单进行删除
            if (time > 1) {
                orderService.removeById(key);
                System.out.println("订单" + key + "删除");
                //订单,库存等一系列操作
                return RocketMQLocalTransactionState.ROLLBACK;
            }
            Integer payStatus = order.getPayStatus();
            if (payStatus == 1) {
                return RocketMQLocalTransactionState.COMMIT;
            }
            return RocketMQLocalTransactionState.UNKNOWN;
        }
        else
            return RocketMQLocalTransactionState.ROLLBACK;
    }
}


测试


这里通过生产者发送五个事务消息,生成五个订单,然后两个订单在一分钟内修改支付状态为已支付,超时一分钟未支付就会删除订单回退。运行截图如下:


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4天前
|
消息中间件 缓存 Java
手写模拟Spring Boot启动过程功能
【11月更文挑战第19天】Spring Boot自推出以来,因其简化了Spring应用的初始搭建和开发过程,迅速成为Java企业级应用开发的首选框架之一。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,帮助读者深入理解其工作机制。
20 3
|
4天前
|
Java 开发者 微服务
手写模拟Spring Boot自动配置功能
【11月更文挑战第19天】随着微服务架构的兴起,Spring Boot作为一种快速开发框架,因其简化了Spring应用的初始搭建和开发过程,受到了广大开发者的青睐。自动配置作为Spring Boot的核心特性之一,大大减少了手动配置的工作量,提高了开发效率。
18 0
|
1月前
|
Java API 数据库
构建RESTful API已经成为现代Web开发的标准做法之一。Spring Boot框架因其简洁的配置、快速的启动特性及丰富的功能集而备受开发者青睐。
【10月更文挑战第11天】本文介绍如何使用Spring Boot构建在线图书管理系统的RESTful API。通过创建Spring Boot项目,定义`Book`实体类、`BookRepository`接口和`BookService`服务类,最后实现`BookController`控制器来处理HTTP请求,展示了从基础环境搭建到API测试的完整过程。
42 4
|
1月前
|
JavaScript 安全 Java
如何使用 Spring Boot 和 Ant Design Pro Vue 实现动态路由和菜单功能,快速搭建前后端分离的应用框架
本文介绍了如何使用 Spring Boot 和 Ant Design Pro Vue 实现动态路由和菜单功能,快速搭建前后端分离的应用框架。首先,确保开发环境已安装必要的工具,然后创建并配置 Spring Boot 项目,包括添加依赖和配置 Spring Security。接着,创建后端 API 和前端项目,配置动态路由和菜单。最后,运行项目并分享实践心得,包括版本兼容性、安全性、性能调优等方面。
143 1
|
28天前
|
Java API 数据库
Spring Boot框架因其简洁的配置、快速的启动特性及丰富的功能集而备受开发者青睐
本文通过在线图书管理系统案例,详细介绍如何使用Spring Boot构建RESTful API。从项目基础环境搭建、实体类与数据访问层定义,到业务逻辑实现和控制器编写,逐步展示了Spring Boot的简洁配置和强大功能。最后,通过Postman测试API,并介绍了如何添加安全性和异常处理,确保API的稳定性和安全性。
35 0
|
2月前
|
Java 关系型数据库 MySQL
创建一个SpringBoot项目,实现简单的CRUD功能和分页查询
【9月更文挑战第6天】该内容介绍如何使用 Spring Boot 实现具备 CRUD 功能及分页查询的项目。首先通过 Spring Initializr 创建项目并选择所需依赖;其次配置数据库连接,并创建实体类与数据访问层;接着构建服务层处理业务逻辑;最后创建控制器处理 HTTP 请求。分页查询可通过添加 URL 参数实现。
|
17天前
|
JavaScript 安全 Java
如何使用 Spring Boot 和 Ant Design Pro Vue 构建一个具有动态路由和菜单功能的前后端分离应用。
本文介绍了如何使用 Spring Boot 和 Ant Design Pro Vue 构建一个具有动态路由和菜单功能的前后端分离应用。首先,创建并配置 Spring Boot 项目,实现后端 API;然后,使用 Ant Design Pro Vue 创建前端项目,配置动态路由和菜单。通过具体案例,展示了如何快速搭建高效、易维护的项目框架。
95 62
|
13天前
|
前端开发 Java easyexcel
SpringBoot操作Excel实现单文件上传、多文件上传、下载、读取内容等功能
SpringBoot操作Excel实现单文件上传、多文件上传、下载、读取内容等功能
53 8
|
15天前
|
JavaScript 安全 Java
如何使用 Spring Boot 和 Ant Design Pro Vue 构建一个前后端分离的应用框架,实现动态路由和菜单功能
本文介绍了如何使用 Spring Boot 和 Ant Design Pro Vue 构建一个前后端分离的应用框架,实现动态路由和菜单功能。首先,确保开发环境已安装必要的工具,然后创建并配置 Spring Boot 项目,包括添加依赖和配置 Spring Security。接着,创建后端 API 和前端项目,配置动态路由和菜单。最后,运行项目并分享实践心得,帮助开发者提高开发效率和应用的可维护性。
33 2
|
19天前
|
JSON Java API
springboot集成ElasticSearch使用completion实现补全功能
springboot集成ElasticSearch使用completion实现补全功能
22 1