Springboot+RocketMQ通过事务消息优雅的实现订单支付功能

简介: RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账, A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银 行账户扣除一万元”这个操作同时成功或者同时失败。RocketMQ采用两阶段提交的方式实现事务消息。

1. 事务消息


RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账, A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银 行账户扣除一万元”这个操作同时成功或者同时失败。RocketMQ采用两阶段提交的方式实现事务消息。


1.1 RocketMQ事务消息的原理


半事务消息发送:生产者将半事务消息发送至RocketMQ服务端。


消息持久化及返回Ack确认:RocketMQ服务端接收到半事务消息并持久化成功后,向生产者返回Ack确认消息已经发送成功。此时消息状态为半事务消息。


执行本地事务逻辑:根据发送结果执行本地事务,如果写入失败,此时half消息对业务不可见,本地事务逻辑不执行。


提交二次确认结果:根据本地事务状态执行Commit或者Rollback。RocketMQ 的事务消息分为3种状态,分别是提交状态、回滚状态、未知状态。

TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。

TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。

TransactionStatus.Unknown: 未知状态,它代表需要检查消息队列来确定状态。


消息回查:(1) 对没有Commit/Rollback的事务消息,从服务端发起一次回查 (2) Producer收到回查消息,检查回查消息对应的本地事务的状态 (3) 根据本地事务状态,重新Commit或者Rollback。第一次回查后仍未获取到事务状态,则之后每隔30s会再次回查,最多重试15次,超过了就会默认丢弃此消息。


1.2 RocketMQ订单支付功能设计


数据库设计


/*
SQLyog Community v13.2.0 (64 bit)
MySQL - 8.0.33 : Database - shop
*********************************************************************
*/
/*!40101 SET NAMES utf8 */;
/*!40101 SET SQL_MODE=''*/;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`shop` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;
USE `shop`;
/*Table structure for table `shop_order` */
DROP TABLE IF EXISTS `shop_order`;
CREATE TABLE `shop_order` (
  `id` VARCHAR(50) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin NOT NULL COMMENT '订单id',
  `total_num` INT DEFAULT NULL COMMENT '数量合计',
  `moneys` INT DEFAULT NULL COMMENT '金额合计',
  `pay_type` VARCHAR(1) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '支付类型,1、在线支付、0 货到付款',
  `create_time` DATETIME DEFAULT NULL COMMENT '订单创建时间',
  `update_time` DATETIME DEFAULT NULL COMMENT '订单更新时间',
  `pay_time` DATETIME DEFAULT NULL COMMENT '付款时间',
  `consign_time` DATETIME DEFAULT NULL COMMENT '发货时间',
  `end_time` DATETIME DEFAULT NULL COMMENT '交易完成时间',
  `username` VARCHAR(50) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '用户名称',
  `recipients` VARCHAR(50) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '收货人',
  `recipients_mobile` VARCHAR(12) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '收货人手机',
  `recipients_address` VARCHAR(200) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '收货人地址',
  `weixin_transaction_id` VARCHAR(30) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '交易流水号',
  `order_status` INT DEFAULT NULL COMMENT '订单状态,0:未完成,1:已完成,2:已退货',
  `pay_status` INT DEFAULT NULL COMMENT '支付状态,0:未支付,1:已支付,2:支付失败',
  `is_delete` INT DEFAULT NULL COMMENT '是否删除',
  PRIMARY KEY (`id`),
  KEY `create_time` (`create_time`),
  KEY `status` (`order_status`),
  KEY `payment_type` (`pay_type`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb3 COLLATE=utf8mb3_bin;
/*Data for the table `shop_order` */
/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;


添加RocketMQ依赖


<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>


bootstrap.yaml配置

server:
  port: 8085
spring:
  application:
    name: mall-order
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/shop?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
    username: root
    password: 123456
  cloud:
    nacos:
      config:
        file-extension: yaml
        server-addr: localhost:8848
      discovery:
        #Nacos的注册地址
        server-addr: localhost:8848
rocketmq:
  name-server: localhost:9876
  producer:
    group: test-group-producer


Service层

public interface OrderService extends IService<Order> {
       //添加订单
       void add(Order order);
       //修改订单支付状态
       void pay(String id);
}
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order>
    implements OrderService{
    @Autowired
    OrderMapper orderMapper;
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void add(Order order) {
        order.setCreateTime(new Date());
        orderMapper.insert(order); //这里仅仅生成订单,还有扣减库存等等一系列操作省略
    }
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void pay(String id) {
        //模拟支付完成,修改订单的支付状态
        Order order = orderMapper.selectById(id);
        order.setPayStatus(1);
        order.setPayTime(new Date());
        orderMapper.updateById(order);
    }
}


创建生产者


@RestController
@Slf4j
public class TestController {
    @Autowired
    OrderMapper orderMapper;
    @Autowired
    RocketMQTemplate rocketMQTemplate;
    @RequestMapping("/send")
    public String send(){
        String id = UUID.randomUUID().toString();
        String msg = "订单"+id+"支付成功";
        Order order=new Order();
        order.setId(id);
        order.setCreateTime(new Date());
        order.setMoneys(100);
        order.setUsername("张三");
        Message<String> message = MessageBuilder.withPayload(msg).setHeader("key",id).build();
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("order", message, order);
        String transactionId = result.getTransactionId();
        String status = result.getSendStatus().name();
        log.info("发送消息成功 transactionId={} status={} ",transactionId,status);
        return "success";
    }
}


创建消费者


@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "test-consumer",topic = "order",messageModel = MessageModel.CLUSTERING)
public class RocketMQListen implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
        System.out.println(body);
    }
}


生产者消息监听器

@Component
@RocketMQTransactionListener
public class TransactionMsgListener implements RocketMQLocalTransactionListener {
    @Autowired
    OrderService orderService;
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        Order order = (Order) o;
        try {
            //生成订单
            orderService.add(order);
            return RocketMQLocalTransactionState.UNKNOWN;
        }catch (Throwable throwable){
            throwable.printStackTrace();
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        String key = message.getHeaders().get("key").toString();
        System.out.println("回查订单id "+key+" 回查时间"+new Date());
        Order order = orderService.getById(key);
        if(order!=null) {
            long l = new Date().getTime() - order.getCreateTime().getTime();
            long time = l / (1000 * 60);
            //超时1分钟后,就会把未支付的订单进行删除
            if (time > 1) {
                orderService.removeById(key);
                System.out.println("订单" + key + "删除");
                //订单,库存等一系列操作
                return RocketMQLocalTransactionState.ROLLBACK;
            }
            Integer payStatus = order.getPayStatus();
            if (payStatus == 1) {
                return RocketMQLocalTransactionState.COMMIT;
            }
            return RocketMQLocalTransactionState.UNKNOWN;
        }
        else
            return RocketMQLocalTransactionState.ROLLBACK;
    }
}


测试


这里通过生产者发送五个事务消息,生成五个订单,然后两个订单在一分钟内修改支付状态为已支付,超时一分钟未支付就会删除订单回退。运行截图如下:


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
9月前
|
消息中间件 大数据 关系型数据库
RocketMQ实战—3.基于RocketMQ升级订单系统架构
本文主要介绍了基于MQ实现订单系统核心流程的异步化改造、基于MQ实现订单系统和第三方系统的解耦、基于MQ实现将订单数据同步给大数据团队、秒杀系统的技术难点以及秒杀商详页的架构设计和基于MQ实现秒杀系统的异步化架构。
665 64
RocketMQ实战—3.基于RocketMQ升级订单系统架构
|
3月前
|
XML Java 应用服务中间件
【SpringBoot(一)】Spring的认知、容器功能讲解与自动装配原理的入门,带你熟悉Springboot中基本的注解使用
SpringBoot专栏开篇第一章,讲述认识SpringBoot、Bean容器功能的讲解、自动装配原理的入门,还有其他常用的Springboot注解!如果想要了解SpringBoot,那么就进来看看吧!
507 2
|
9月前
|
XML 前端开发 Java
SpringBoot实现文件上传下载功能
本文介绍了如何使用SpringBoot实现文件上传与下载功能,涵盖配置和代码实现。包括Maven依赖配置(如`spring-boot-starter-web`和`spring-boot-starter-thymeleaf`)、前端HTML页面设计、WebConfig路径映射配置、YAML文件路径设置,以及核心的文件上传(通过`MultipartFile`处理)和下载(利用`ResponseEntity`返回文件流)功能的Java代码实现。文章由Colorful_WP撰写,内容详实,适合开发者学习参考。
928 0
|
6月前
|
缓存 前端开发 Java
SpringBoot 实现动态菜单功能完整指南
本文介绍了一个动态菜单系统的实现方案,涵盖数据库设计、SpringBoot后端实现、Vue前端展示及权限控制等内容,适用于中后台系统的权限管理。
683 1
|
8月前
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
285 32
|
8月前
|
安全 Java API
Spring Boot 功能模块全解析:构建现代Java应用的技术图谱
Spring Boot不是一个单一的工具,而是一个由众多功能模块组成的生态系统。这些模块可以根据应用需求灵活组合,构建从简单的REST API到复杂的微服务系统,再到现代的AI驱动应用。
1112 8
|
Java 开发者 微服务
手写模拟Spring Boot自动配置功能
【11月更文挑战第19天】随着微服务架构的兴起,Spring Boot作为一种快速开发框架,因其简化了Spring应用的初始搭建和开发过程,受到了广大开发者的青睐。自动配置作为Spring Boot的核心特性之一,大大减少了手动配置的工作量,提高了开发效率。
263 0
|
7月前
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
1456 0