万字 +图片解析死信队列和死信实战演练

简介: 万字 +图片解析死信队列和死信实战演练
📒 博客首页: 崇尚学技术的科班人
🏇 小肖来了
🍣 今天给大家带来的文章是《万字 +图片解析死信队列和死信实战演练》🍣
🍣 有的小伙伴可能会问死信队列有啥用?你看了这篇文章就知道了🍣
🍣 希望各位小伙伴们能够耐心的读完这篇文章🍣
🙏 博主也在学习阶段,如若发现问题,请告知,非常感谢🙏
💗 同时也非常感谢各位小伙伴们的支持💗

1、死信队列

1.1、概念

  • 死信:就是无法被消费的消息。由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
  • 应用场景:保证订单业务的消息数据不丢失,当消息发生异常时,将消息投入死信队列中。比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

1.2、死信来源

  1. 消息TTL过期
  2. 队列达到最大长度(队列满了,无法再添加数据到队列中)。
  3. 消息被拒绝并且requeue = false

1.3、死信实战

1.3.1、代码架构图

在这里插入图片描述

1.3.2、TTL过期情况

1. 消费者01

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

import java.util.HashMap;
import java.util.Map;

public class Consumer01 {
    public static final String DEAD_EXCHANGE = "dead_exchange";

    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static final String DEAD_QUEUE = "dead_queue";

    public static final String NORMAL_QUEUE = "normal_queue";

    /**
     * 死信实战
     * 消费者01
     * @param args
     * @throws Exception
     */

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();
        // 死信交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 普通交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        map.put("x-dead-letter-routing-key","lisi");
        map.put("x-message-ttl",10000);

        // 普通队列
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
        // 死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        // 队列绑定
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println("Consumer01控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
        };
        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,var1->{});

    }
}
  • 最为复杂的就是消费者01,它需要进行 死信交换机绑定死信队列普通交换机绑定普通队列普通队列绑定死信交换机
  • 我们为了让消息不被消费,我们需要制造假死现象,也就是关闭消费者01

2. 消费者02

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

import java.util.HashMap;
import java.util.Map;

public class Consumer02 {
    /**
     * 消费者02
     */

    public static final String DEAD_QUEUE = "dead_queue";


    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();

        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println("Consumer02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
        };
        // 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});

    }
}

3. 生产者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;

import java.nio.charset.StandardCharsets;

public class Producer {
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();


        // 单位是毫秒
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        for(int i = 1; i < 11; i ++){
            String message = "info" + i;
            // 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
        }
    }
}

4. 测试结果

在这里插入图片描述

  • 所有的消息在超过过期时间之后,全部转移到了死信队列中

在这里插入图片描述

1.3.3、队列达到最大长度情况

1. 消费者01

public class Consumer01 {
    public static final String DEAD_EXCHANGE = "dead_exchange";

    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static final String DEAD_QUEUE = "dead_queue";

    public static final String NORMAL_QUEUE = "normal_queue";

    /**
     * 死信实战
     * 消费者01
     * @param args
     * @throws Exception
     */

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();
        // 死信交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 普通交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        map.put("x-dead-letter-routing-key","lisi");
        map.put("x-max-length",6);
        //map.put("x-message-ttl",10000);

        // 普通队列
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
        // 死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        // 队列绑定
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println("Consumer01控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
        };
        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,var1->{});

    }
}
  • 这里我们将过期时间参数改为了队列最大长度
  • 我们为了让消息不被消费和观察到明显现象,我们需要制造假死现象,也就是关闭消费者01

2. 消费者02

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

import java.util.HashMap;
import java.util.Map;

public class Consumer02 {
    /**
     * 消费者02
     */

    public static final String DEAD_QUEUE = "dead_queue";


    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();

        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println("Consumer02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
        };
        // 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});

    }
}
  • 消费者02和TTL过期情况下的一模一样

3. 生产者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;

import java.nio.charset.StandardCharsets;

public class Producer {
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();


        // 单位是毫秒
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        for(int i = 1; i < 11; i ++){
            String message = "info" + i;
            // 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
        }
    }
}
  • 我们将对应的设置过期时间注释掉

4. 测试结果

  • 如果我们启动消费者01会报错,那是因为我们所创建的队列已经存在,我们需要把普通队列删除,因为只有它的参数发生了改变

在这里插入图片描述

  • 因为我们设置了普通队列的最大长度6,所以当超过了最大长度的消息都会被作为死信

1.3.4、消息被拒情况

1. 消费者01

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

import java.util.HashMap;
import java.util.Map;

public class Consumer01 {
    public static final String DEAD_EXCHANGE = "dead_exchange";

    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static final String DEAD_QUEUE = "dead_queue";

    public static final String NORMAL_QUEUE = "normal_queue";

    /**
     * 死信实战
     * 消费者01
     * @param args
     * @throws Exception
     */

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();
        // 死信交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 普通交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        map.put("x-dead-letter-routing-key","lisi");
        //map.put("x-max-length",6);
        //map.put("x-message-ttl",10000);

        // 普通队列
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
        // 死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        // 队列绑定
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (var1, var2)->{
            String msg = new String(var2.getBody(),"UTF-8");
            if(msg.equals("info5")){
                System.out.println("Consumer01控制台接收到的消息是:" + msg + ": 此消息被拒" );
                channel.basicReject(var2.getEnvelope().getDeliveryTag(),false);
            }else{
                System.out.println("Consumer01控制台接收到的消息是:" + msg);
                channel.basicAck(var2.getEnvelope().getDeliveryTag(),false);
            }

        };
        channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,var1->{});

    }
}

  • 这里我们将队列最大长度注释掉
  • 我们还需要开启手动应答,因为不开启就不会存在消息被拒 的问题。

2. 消费者02

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

import java.util.HashMap;
import java.util.Map;

public class Consumer02 {
    /**
     * 消费者02
     */

    public static final String DEAD_QUEUE = "dead_queue";


    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();

        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println("Consumer02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
        };
        // 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});

    }
}
  • 消费者02和队列达到最大长度情况下的一模一样

3. 生产者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;

import java.nio.charset.StandardCharsets;

public class Producer {
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();


        // 单位是毫秒
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        for(int i = 1; i < 11; i ++){
            String message = "info" + i;
            // 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
        }
    }
}
  • 生产者和队列达到最大长度情况下的一模一样

4. 测试结果

  • 测试之前我们需要将队列中的消息消费掉,并且需要将普通队列删除。

在这里插入图片描述

在这里插入图片描述

  • 可见只有"info5"被作为死信。
相关文章
|
6天前
|
数据采集 消息中间件 监控
Flume数据采集系统设计与配置实战:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入探讨Apache Flume的数据采集系统设计,涵盖Flume Agent、Source、Channel、Sink的核心概念及其配置实战。通过实例展示了文件日志收集、网络数据接收、命令行实时数据捕获等场景。此外,还讨论了Flume与同类工具的对比、实际项目挑战及解决方案,以及未来发展趋势。提供配置示例帮助理解Flume在数据集成、日志收集中的应用,为面试准备提供扎实的理论与实践支持。
21 1
|
16天前
|
监控 前端开发 JavaScript
实战篇:商品API接口在跨平台销售中的有效运用与案例解析
随着电子商务的蓬勃发展,企业为了扩大市场覆盖面,经常需要在多个在线平台上展示和销售产品。然而,手工管理多个平台的库存、价格、商品描述等信息既耗时又容易出错。商品API接口在这一背景下显得尤为重要,它能够帮助企业在不同的销售平台之间实现商品信息的高效同步和管理。本文将通过具体的淘宝API接口使用案例,展示如何在跨平台销售中有效利用商品API接口,以及如何通过代码实现数据的统一管理。
|
29天前
|
存储 搜索推荐 人机交互
Qt鼠标事件全面解析:从基础到实战
Qt鼠标事件全面解析:从基础到实战
114 0
|
1月前
|
存储 人工智能 算法
【冲击蓝桥篇】动态规划(上):真题实战+思路解析
【冲击蓝桥篇】动态规划(上):真题实战+思路解析
|
2月前
|
应用服务中间件 PHP 开发工具
Nginx解析环境搭建及实战
Nginx解析环境搭建及实战
25 0
|
18天前
|
存储 缓存 算法
Python中collections模块的deque双端队列:深入解析与应用
在Python的`collections`模块中,`deque`(双端队列)是一个线程安全、快速添加和删除元素的双端队列数据类型。它支持从队列的两端添加和弹出元素,提供了比列表更高的效率,特别是在处理大型数据集时。本文将详细解析`deque`的原理、使用方法以及它在各种场景中的应用。
|
20天前
|
安全 Java 数据安全/隐私保护
【深入浅出Spring原理及实战】「EL表达式开发系列」深入解析SpringEL表达式理论详解与实际应用
【深入浅出Spring原理及实战】「EL表达式开发系列」深入解析SpringEL表达式理论详解与实际应用
43 1
|
5天前
|
SQL API 数据库
Python中的SQLAlchemy框架:深度解析与实战应用
【4月更文挑战第13天】在Python的众多ORM(对象关系映射)框架中,SQLAlchemy以其功能强大、灵活性和易扩展性脱颖而出,成为许多开发者首选的数据库操作工具。本文将深入探讨SQLAlchemy的核心概念、功能特点以及实战应用,帮助读者更好地理解和使用这一框架。
|
6天前
|
Java 数据库 Spring
切面编程的艺术:Spring动态代理解析与实战
切面编程的艺术:Spring动态代理解析与实战
19 0
切面编程的艺术:Spring动态代理解析与实战
|
23天前
|
机器学习/深度学习 人工智能 自然语言处理
大模型落地实战指南:从选择到训练,深度解析显卡选型、模型训练技、模型选择巧及AI未来展望---打造AI应用新篇章
大模型落地实战指南:从选择到训练,深度解析显卡选型、模型训练技、模型选择巧及AI未来展望---打造AI应用新篇章
大模型落地实战指南:从选择到训练,深度解析显卡选型、模型训练技、模型选择巧及AI未来展望---打造AI应用新篇章

推荐镜像

更多