Springboot----项目整合微信支付与RabbitMQ(使用RabbitMQ延迟插件实现订单管理)

简介: 主要介绍如何在Springboot项目支付模块中使用RabbitMQ实现延迟队列,采用的是RabbitMQ的延迟插件

一:🐱‍🏍问题引入

前面提到可以使用RabbitMQ实现订单到期自动取消以及当超过某一时间订单还是显示未支付时候就可以通过延迟队列主动向微信支付后台进行订单查询。

由于RabbitMQ是基于Erlang语言开发的,因此要使用RabbitMQ,首先要安装Erlang,至于安装教程可以自行百度解决,然后就是安装RabbitMQ并进行相关配置。

在RabbitMQ 3.6.X之前,要实现延迟队列只能通过TTL(生存时间)+ DLX(死信交换机)来实现,实现过程并不复杂。在RabbitMQ官方文档中有这样一句话:Dead letter exchanges (DLXs) are normal exchanges. They can be any of the usual types and are declared as usual. 意思是死信交换机是一个普通的交换机,它可以被当做普通交换机来使用。关键点在于这个交换机是用来存放过期消息的,所以这一交换机就称为死信交换机,流程图见下图:

设置过期时间有两种方法,一种是单独针对每一条消息进行设置,但是这样会因为时序问题形成队头阻塞现象。因为队列消息是按序消费的,如果队头的消息延迟时间是 10s, 后面的消息都要等至少 10s 后才可以进行消费。另一种方法是设置过期时间在消息队列上,如果过期时间设置在队列上,所有发送到队列的消息延迟时间都是该队列设定值,而业务需求延迟时间是随着重试次数线性增长的,这样就需要创建很多个固定延迟时间的队列。

可以看到无论采用哪一种方式都有很大的缺陷,但是在这个项目中是可以采用第二种方式的,因为针对每一笔订单设置的过期时间都为5分钟。

在RabbitMQ 3.6.X之后,RabbitMQ推出了delay-message 插件,该插件可以更好地实现延迟队列,当然,要使用这个插件还需要自行进行安装,具体安装过程可以自己百度解决。使用该插件的好处有两个方面,当然就是针对上面两种方案的缺陷来改进的。

首先,它是将延迟时间设置在消息上的,这样只要创建一个队列即可;

其次,指定为延迟类型的交换机在接收到消息后并未立即将消息投递至目标队列中,而是存储在 mnesia (一个分布式数据系统)表中,检测消息延迟时间,在达到可投递时间时才投递至目标队列,这样就不存在队头阻塞现象。

二:🐱‍🏍相关配置

#rabbitmq配置
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: correlated  # 用来配置消息发送到交换器之后是否触发回调方法
    publisher-returns: true   # 触发路由失败消息的回调(用不上)
    listener:
      simple:
        acknowledge-mode: manual  #手动确认
        prefetch: 1    #限流(海量数据,同时只能过来一条)

需要说明的是,publisher-confirm-type设置为correlated表示消息发送到交换机之后会发送回调通知给生产者,如果由于RabbitMQ内部原因导致交换机接收失败返回失败回调信息之后需要进行异常处理。publisher-returns这一参数实质上是用不上的,因为延时消息是从磁盘上读取消息然后发送(后台任务),发送消息时候无法保证两点:

  1. 发送时消息路由队列还存在
  2. 发送时原连接仍然支持回调方法

因为消息写磁盘和读磁盘消息发送存在时间差,两个时间点的队列和连接情况可能不同,所以不支持Mandatory设置。(publisher-returns: true必须与template.mandatory: true一起设置路由失败消息的回调才能生效)。

此外,为了保证消息传递的可靠性,我将消息确认机制设置为手动确认,同时每次只能过来一条数据。

三:🐱‍🏍代码实现

3.1:初始化设置

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayMessageConfig {
    //交换机名称
    public static final String DELAY_EXCHANGE_NAME = "plugin.delay.exchange";
    //消息队列名称
    public static final String DELAY_QUEUE_ORDER_NAME = "plugin.delay.order.queue";  //订单队列
    public static final String DELAY_QUEUE_REFUND_NAME = "plugin.delay.refund.queue";  //退款处理队列
    //路由名称
    public static final String ROUTING_KEY_ORDER = "plugin.delay.routing_order";  //订单路由名称
    public static final String ROUTING_KEY_REFUND = "plugin.delay.routing_refund";  //退款路由名称
    /**
     * 声明一个订单延迟队列
     * @return
     */
    @Bean("ORDER_DELAY_QUEUE")
    Queue orderDelayQueue(){
        return QueueBuilder.durable(DELAY_QUEUE_ORDER_NAME).build();
    }
    /**
     * 声明一个退款延迟队列
     * @return
     */
    @Bean("REFUND_DELAY_QUEUE")
    Queue refundDelayQueue(){
        return QueueBuilder.durable(DELAY_QUEUE_REFUND_NAME).build();
    }
    /**
     * 声明一个交换机
     * @return
     */
    @Bean("DELAY_EXCHANGE")
    CustomExchange delayExchange(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true,false, args);
    }
    /**
     * 订单延迟队列绑定
     * @param orderDelayQueue
     * @param delayExchange
     * @return
     */
    @Bean
    Binding orderDelayQueueBinding(@Qualifier("ORDER_DELAY_QUEUE") Queue orderDelayQueue,@Qualifier("DELAY_EXCHANGE") CustomExchange delayExchange){
        return BindingBuilder.bind(orderDelayQueue).to(delayExchange).with(ROUTING_KEY_ORDER).noargs();
    }
    /**
     * 退款延迟队列绑定
     * @param refundDelayQueue
     * @param delayExchange
     * @return
     */
    @Bean
    Binding refundDelayQueueBinding(@Qualifier("REFUND_DELAY_QUEUE") Queue refundDelayQueue,@Qualifier("DELAY_EXCHANGE") CustomExchange delayExchange){
        return BindingBuilder.bind(refundDelayQueue).to(delayExchange).with(ROUTING_KEY_REFUND).noargs();
    }
}

说明:这里声明的交换机类型为直接交换机,交换机通过路由键与不同队列进行绑定,这里是一个交换机绑定了两个队列,因为除了用户下单需要用到延迟队列之外,用户退款也需要用到延迟队列,具体细节我会在后面讲解。

3.2:生产者

import com.fasterxml.jackson.databind.ObjectMapper;
import com.my.reggie.pojo.Orders;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * @description : 消息生产者
 */
@Component
@Slf4j
public class RabbitmqDelayProducer {
    @Resource
    private RabbitTemplate rabbitTemplate;
    /**
     *
     * @param No 消息
     * @param messageId 唯一id
     * @param exchangeName 交换机
     * @param key 路由键
     * @param delayTime 延迟时间(毫秒)
     */
    public void publish(String No, String messageId, String exchangeName, String key, Integer delayTime) {
        /* 确认的回调 确认消息是否到达 Broker 服务器 其实就是是否到达交换器
         * 如果发送时候指定的交换器不存在 ack 就是 false 代表消息不可达
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            assert correlationData != null;
            String message_Id = correlationData.getId();
            //返回成功,表示消息被正常投递到交换机
            if (ack) {
                log.info("信息投递到交换机成功,messageId:{}",message_Id);
            } else {
                log.error("交换机不可达,messageId:{} 原因:{}",message_Id,cause);
            }
        });
        /**
         * 延时消息是从磁盘读取消息然后发送(后台任务),发送消息的时候无法保证两点:
         *
         * 1、发送时消息路由的队列还存在
         * 2、发送时原连接仍然支持回调方法
         * 原因:消息写磁盘和从磁盘读取消息发送存在时间差,两个时间点的队列和连接情况可能不同。所以不支持Mandatory设置
         */
        /* 消息失败的回调
         * 例如消息已经到达交换器上,但路由键匹配任何绑定到该交换器的队列,会触发这个回调,此时 replyText: NO_ROUTE
         * 用不上
         */
        rabbitTemplate.setMandatory(false);
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            String message_Id = returnedMessage.getMessage().getMessageProperties().getMessageId();
                    byte[] message = returnedMessage.getMessage().getBody();
                    Integer replyCode = returnedMessage.getReplyCode();
                    String replyText = returnedMessage.getReplyText();
                    String exchange = returnedMessage.getExchange();
                    String routingKey = returnedMessage.getRoutingKey();
                    log.warn("消息:{} 发送失败,消息ID:{} 应答码:{} 原因:{} 交换机:{} 路由键:{}",
                            new String(message),message_Id,replyCode,replyText,exchange,routingKey);
                }
        );
        // 在实际中ID 应该是全局唯一 能够唯一标识消息 消息不可达的时候触发ConfirmCallback回调方法时可以获取该值,进行对应的错误处理
        CorrelationData correlationData = new CorrelationData(messageId);
        rabbitTemplate.convertAndSend(exchangeName, key, No, message -> {
            // 设置延迟时间
            message.getMessageProperties().setDelay(delayTime);
            return message;
        }, correlationData);
    }
}

3.3:消费者

import com.my.reggie.service.WxPayService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class RabbitmqDelayConsumer {
    @Autowired
    private WxPayService wxPayService;
    /**
     * 监听订单延迟队列
     * @param orderNo
     * @throws Exception
     */
    @RabbitListener(queues = {"plugin.delay.order.queue"})
    public void orderDelayQueue(String orderNo, Message message, Channel channel) throws Exception {
        log.info("订单延迟队列开始消费...");
        try {
            //处理订单
            wxPayService.checkOrderStatus(orderNo);
            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            log.info("消息接收成功");
        } catch (Exception e) {
            e.printStackTrace();
            //消息重新入队
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
            log.info("消息接收失败,重新入队");
        }
    }
    /**
     * 监听退款延迟队列
     * @param refundNo
     */
    @RabbitListener(queues = {"plugin.delay.refund.queue"})
    public void refundDelayQueue(String refundNo, Message message, Channel channel) throws Exception {
        log.info("退款延迟队列开始消费...");
        try {
            //处理退款信息
            wxPayService.checkRefundStatus(refundNo);
            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            log.info("消息接收成功");
        } catch (Exception e) {
            e.printStackTrace();
            //消息重新入队
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
            log.info("消息接收失败,重新入队");
        }
    }
}

由于开启了手动确认机制,假如成功处理消息,就需要向服务器告知消息已经成功被我消费,可以在队列中删除该条消息,否则服务器会不断重新发送消息,要是出现异常就需要将消息重新放回队列中。

四:🐱‍🏍友情链接

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
22天前
|
Java 应用服务中间件
SpringBoot获取项目文件的绝对路径和相对路径
SpringBoot获取项目文件的绝对路径和相对路径
62 1
SpringBoot获取项目文件的绝对路径和相对路径
|
13天前
|
存储 运维 安全
Spring运维之boot项目多环境(yaml 多文件 proerties)及分组管理与开发控制
通过以上措施,可以保证Spring Boot项目的配置管理在专业水准上,并且易于维护和管理,符合搜索引擎收录标准。
25 2
|
17天前
|
分布式计算 关系型数据库 MySQL
SpringBoot项目中mysql字段映射使用JSONObject和JSONArray类型
SpringBoot项目中mysql字段映射使用JSONObject和JSONArray类型 图像处理 光通信 分布式计算 算法语言 信息技术 计算机应用
36 8
|
1月前
|
JavaScript 前端开发 Java
解决跨域问题大集合:vue-cli项目 和 java/springboot(6种方式) 两端解决(完美解决)
这篇文章详细介绍了如何在前端Vue项目和后端Spring Boot项目中通过多种方式解决跨域问题。
359 1
解决跨域问题大集合:vue-cli项目 和 java/springboot(6种方式) 两端解决(完美解决)
|
24天前
|
JavaScript 前端开发 Java
SpringBoot项目的html页面使用axios进行get post请求
SpringBoot项目的html页面使用axios进行get post请求
42 2
|
24天前
|
前端开发 Java Spring
SpringBoot项目thymeleaf页面支持词条国际化切换
SpringBoot项目thymeleaf页面支持词条国际化切换
55 2
|
24天前
|
JSON Java 数据库
SpringBoot项目使用AOP及自定义注解保存操作日志
SpringBoot项目使用AOP及自定义注解保存操作日志
35 1
|
26天前
|
JavaScript Java 项目管理
Java毕设学习 基于SpringBoot + Vue 的医院管理系统 持续给大家寻找Java毕设学习项目(附源码)
基于SpringBoot + Vue的医院管理系统,涵盖医院、患者、挂号、药物、检查、病床、排班管理和数据分析等功能。开发工具为IDEA和HBuilder X,环境需配置jdk8、Node.js14、MySQL8。文末提供源码下载链接。
|
8天前
|
JavaScript 前端开发 Java
SpringBoot项目的html页面使用axios进行get post请求
SpringBoot项目的html页面使用axios进行get post请求
24 0
|
26天前
|
关系型数据库 MySQL Java
SpringBoot项目中mysql字段映射使用JSONObject和JSONArray类型
SpringBoot项目中mysql字段映射使用JSONObject和JSONArray类型
26 0
下一篇
无影云桌面