死信队列实现订单超时代码实例(RabbitMq)

简介: 死信队列实现订单超时代码实例(RabbitMq)

前面介绍了RabbitMq的几种模式,这篇文章主要介绍死信队列的使用和实际应用场景订单超时怎么和死信队列结合。


一、业务场景


用户在淘宝或者京东下单的时候,一般会预购,30分钟之后如果没有付款则订单超时,这个功能怎么实现呢?

1、可以存入mysql数据库,然后每隔一段时间就定时器查询一次数据库,这样对数据库的io负载很大,而且百分之90都是没必要的开销。

2、可以和rabbitMq死信队列TTL来实现。


二、代码实例


死信队列满足的条件是什么呢,当队列订单超时,当队列超过最大值,当消费者消费失败主动调用basicNack方法进入死信队列。在配置文件需要加入几行参数:

default-requeue-rejected=false 这个一定要设置成为false。# 默认是auto 自动确定是否收到消息,如果消费失败则会一直进入队列消费 # 改为manual手动调用change.basicAck确认 # 改为none 若没收到或者消费成功都不会回到队列 spring.rabbitmq.listener.simple.acknowledge-mode=manual

# ----- RabbitMq -------- #
spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.listener.type=simple
# 默认true,代表消费报错会重新回到队列。false则不会回到队列
spring.rabbitmq.listener.simple.default-requeue-rejected=false
# 默认是auto 自动确定是否收到消息,如果消费失败则会一直进入队列消费
# 改为manual手动调用change.basicAck确认
# 改为none 若没收到或者消费成功都不会回到队列
spring.rabbitmq.listener.simple.acknowledge-mode=manual

接下来创建死信队列和business队列:

public static final String business_fanout_exchange = "business-fanout-exchange";
    public static final String dead_letter_exchange = "dead-letter-exchange";
    public static final String dead_letter_routing_keyA = "dead-letter-routing-keyA";
    public static final String dead_letter_routing_keyB = "dead-letter-routing-keyB";
    public static final String business_queueA = "business-queueA";
    public static final String dead_letter_queueA = "dead-letter-queueA";
    public static final String dead_letter_queueB = "dead-letter-queueB";
    /**
     * business交换机
     *
     * @return
     */
    @Bean
    public FanoutExchange businessFanoutExchange() {
        return new FanoutExchange(business_fanout_exchange);
    }
    /**
     * 死信交换机
     */
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(dead_letter_exchange);
    }
    /**
     * business队列
     */
    @Bean
    public Queue businessQueueA() {
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", dead_letter_exchange);
        //声明当前队列绑定的死信路由key
        args.put("x-dead-letter-routing-key", dead_letter_routing_keyA);
        return QueueBuilder.durable(business_queueA).withArguments(args).build();
    }
    /**
     * 声明死信队列
     */
    @Bean
    public Queue deadLetterQueueA() {
        return new Queue(dead_letter_queueA);
    }
    @Bean
    public Queue deadLetterQueueB() {
        return new Queue(dead_letter_queueB);
    }
    /**
     * 业务A绑定交换机
     */
    @Bean
    public Binding businessQueueBindingA(@Qualifier("businessQueueA") Queue businessQueueA,
        @Qualifier("businessFanoutExchange") FanoutExchange businessFanoutExchange) {
        return BindingBuilder.bind(businessQueueA).to(businessFanoutExchange);
    }
    /**
     * 死信绑定交换机
     */
    @Bean
    public Binding deadQuereBindingA(@Qualifier("deadLetterQueueA") Queue deadLetterQueueA,
        @Qualifier("deadLetterExchange") DirectExchange deadLetterExchange) {
        return BindingBuilder.bind(deadLetterQueueA).to(deadLetterExchange).with(dead_letter_routing_keyA);
    }
    @Bean
    public Binding deadQuereBindingB(@Qualifier("deadLetterQueueB") Queue deadLetterQueueB,
        @Qualifier("deadLetterExchange") DirectExchange deadLetterExchange) {
        return BindingBuilder.bind(deadLetterQueueB).to(deadLetterExchange).with(dead_letter_routing_keyB);
    }

死信队列访问接口:

/**
     * 死信
     */
    @RequestMapping("/deadTo")
    public void deadTo() {
        String dead_letter = "dead-letter";
        rabbitTemplate.convertAndSend(DeadConfig.business_fanout_exchange, "", dead_letter);
    }

消费监听:

/**
     * 死信 【业务端】
     */
    @RabbitListener(queues = DeadConfig.business_queueA)
    public void dead(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        System.out.println("业务接收到消息AAAA");
        boolean ack = true;
        try {
            if (msg.contains("dead-letter")) {
                throw new RuntimeException("消费异常");
            }
        } catch (Exception e) {
            ack = false;
        }
        System.out.println(
            "message.getMessageProperties().getDeliveryTag():" + message.getMessageProperties().getDeliveryTag());
        if (!ack) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } else {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
    /**
     * 死信 【死信队列监听】
     *
     * @param message
     */
    @RabbitListener(queues = DeadConfig.dead_letter_queueA)
    public void deadQueueA(Message message, Channel channel) throws IOException {
        System.out.println("接收到死信队列消息AAA:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

这时候死信就可以接受到数据了,如果业务发生异常,这时候进入死信队列消息,可以进行我们的业务。

如果这时候要实现订单超时功能可以改成下面的代码

/**
     * business队列
     */
    @Bean
    public Queue businessQueueA() {
        Map<String, Object> args = new HashMap<>(3);
        //订单最多存在10s
        args.put("x-message-ttl",10000);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", dead_letter_exchange);
        //声明当前队列绑定的死信路由key
        args.put("x-dead-letter-routing-key", dead_letter_routing_keyA);
        return QueueBuilder.durable(business_queueA).withArguments(args).build();
    }
 /**
     * 死信
     */
    @RequestMapping("/deadTo")
    public void deadTo() {
        //添加未支付 订单到mysql , 0代表未支付,1代表已支付
        int order = 0;
        rabbitMqService.addUserOrder(order);
        String dead_letter = "dead-letter";
        rabbitTemplate.convertAndSend(DeadConfig.business_fanout_exchange, "", dead_letter);
    }
 /**
     * 死信 【业务端】
     */
    @RabbitListener(queues = DeadConfig.business_queueA)
    public void dead(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        System.out.println("业务接收到消息AAAA");
        boolean ack = true;
        try {
            if (msg.contains("dead-letter")) {
                throw new RuntimeException("消费异常");
            }
        } catch (Exception e) {
            ack = false;
        }
        System.out.println(
            "message.getMessageProperties().getDeliveryTag():" + message.getMessageProperties().getDeliveryTag());
        if (!ack) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } else {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
    /**
     * 死信 【死信队列监听】
     *
     * @param message
     */
    @RabbitListener(queues = DeadConfig.dead_letter_queueA)
    public void deadQueueA(Message message, Channel channel) throws IOException {
        // 死信队列监听业务过期消息,查看数据库是否已经修改数据,0未支付,1代表已支付
        int order = getMysqlUserOrder();
        if(order == 0){
            //则吧数据库改为订单超时
            System.out.println("订单超时");
        }else{
            // 则吧数据库订单改为发货
            System.out.println("订单发货");
        }
        System.out.println("接收到死信队列消息AAA:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
21天前
|
物联网
MQTT常见问题之用单片机接入阿里MQTT实例失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
21天前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
24 1
|
21天前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
114 0
|
21天前
|
消息中间件 存储 NoSQL
RabbitMQ的幂等性、优先级队列和惰性队列
**摘要:** 本文讨论了RabbitMQ中的幂等性、优先级队列和惰性队列。幂等性确保了重复请求不会导致副作用,关键在于消费端的幂等性保障,如使用唯一ID和Redis的原子性操作。优先级队列适用于处理不同重要性消息,如大客户订单优先处理,通过设置`x-max-priority`属性实现。惰性队列自3.6.0版起提供,用于延迟将消息加载到内存,适合大量消息存储和消费者延迟消费的场景。
35 4
|
21天前
|
消息中间件 人工智能 Java
Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动
Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动
33 1
|
21天前
|
消息中间件 Java
SpringBoot基于RabbitMQ实现死信队列 (SpringBoot整合RabbitMQ实战篇)
SpringBoot基于RabbitMQ实现死信队列 (SpringBoot整合RabbitMQ实战篇)
45 1
|
21天前
|
消息中间件
第十五章 RabbitMQ 延迟队列
第十五章 RabbitMQ 延迟队列
18 0
|
21天前
|
消息中间件
RabbitMQ 死信队列
RabbitMQ 死信队列
29 0
RabbitMQ 死信队列
|
21天前
|
消息中间件 Java API
RabbitMQ入门指南(五):Java声明队列、交换机以及绑定
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了Java声明队列、交换机以及绑定队列和交换机等内容。
71 0
|
21天前
|
消息中间件 存储 监控
解析RocketMQ:高性能分布式消息队列的原理与应用
RocketMQ是阿里开源的高性能分布式消息队列,具备低延迟、高吞吐和高可靠性,广泛应用于电商、金融等领域。其核心概念包括Topic、Producer、Consumer、Message和Name Server/Broker。RocketMQ支持异步通信、系统解耦、异步处理和流量削峰。关键特性有分布式架构、顺序消息、高可用性设计和消息事务。提供发布/订阅和点对点模型,以及消息过滤功能。通过集群模式、存储方式、发送和消费方式的选择进行性能优化。RocketMQ易于部署,可与Spring集成,并与Kafka等系统对比各有优势,拥有丰富的生态系统。
246 4