万字 +图片解析死信队列和死信实战演练

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 万字 +图片解析死信队列和死信实战演练
📒 博客首页: 崇尚学技术的科班人
🏇 小肖来了
🍣 今天给大家带来的文章是《万字 +图片解析死信队列和死信实战演练》🍣
🍣 有的小伙伴可能会问死信队列有啥用?你看了这篇文章就知道了🍣
🍣 希望各位小伙伴们能够耐心的读完这篇文章🍣
🙏 博主也在学习阶段,如若发现问题,请告知,非常感谢🙏
💗 同时也非常感谢各位小伙伴们的支持💗

1、死信队列

1.1、概念

  • 死信:就是无法被消费的消息。由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
  • 应用场景:保证订单业务的消息数据不丢失,当消息发生异常时,将消息投入死信队列中。比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

1.2、死信来源

  1. 消息TTL过期
  2. 队列达到最大长度(队列满了,无法再添加数据到队列中)。
  3. 消息被拒绝并且requeue = false

1.3、死信实战

1.3.1、代码架构图

在这里插入图片描述

1.3.2、TTL过期情况

1. 消费者01

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

import java.util.HashMap;
import java.util.Map;

public class Consumer01 {
    public static final String DEAD_EXCHANGE = "dead_exchange";

    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static final String DEAD_QUEUE = "dead_queue";

    public static final String NORMAL_QUEUE = "normal_queue";

    /**
     * 死信实战
     * 消费者01
     * @param args
     * @throws Exception
     */

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();
        // 死信交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 普通交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        map.put("x-dead-letter-routing-key","lisi");
        map.put("x-message-ttl",10000);

        // 普通队列
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
        // 死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        // 队列绑定
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println("Consumer01控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
        };
        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,var1->{});

    }
}
  • 最为复杂的就是消费者01,它需要进行 死信交换机绑定死信队列普通交换机绑定普通队列普通队列绑定死信交换机
  • 我们为了让消息不被消费,我们需要制造假死现象,也就是关闭消费者01

2. 消费者02

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

import java.util.HashMap;
import java.util.Map;

public class Consumer02 {
    /**
     * 消费者02
     */

    public static final String DEAD_QUEUE = "dead_queue";


    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();

        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println("Consumer02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
        };
        // 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});

    }
}

3. 生产者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;

import java.nio.charset.StandardCharsets;

public class Producer {
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();


        // 单位是毫秒
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        for(int i = 1; i < 11; i ++){
            String message = "info" + i;
            // 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
        }
    }
}

4. 测试结果

在这里插入图片描述

  • 所有的消息在超过过期时间之后,全部转移到了死信队列中

在这里插入图片描述

1.3.3、队列达到最大长度情况

1. 消费者01

public class Consumer01 {
    public static final String DEAD_EXCHANGE = "dead_exchange";

    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static final String DEAD_QUEUE = "dead_queue";

    public static final String NORMAL_QUEUE = "normal_queue";

    /**
     * 死信实战
     * 消费者01
     * @param args
     * @throws Exception
     */

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();
        // 死信交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 普通交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        map.put("x-dead-letter-routing-key","lisi");
        map.put("x-max-length",6);
        //map.put("x-message-ttl",10000);

        // 普通队列
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
        // 死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        // 队列绑定
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println("Consumer01控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
        };
        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,var1->{});

    }
}
  • 这里我们将过期时间参数改为了队列最大长度
  • 我们为了让消息不被消费和观察到明显现象,我们需要制造假死现象,也就是关闭消费者01

2. 消费者02

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

import java.util.HashMap;
import java.util.Map;

public class Consumer02 {
    /**
     * 消费者02
     */

    public static final String DEAD_QUEUE = "dead_queue";


    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();

        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println("Consumer02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
        };
        // 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});

    }
}
  • 消费者02和TTL过期情况下的一模一样

3. 生产者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;

import java.nio.charset.StandardCharsets;

public class Producer {
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();


        // 单位是毫秒
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        for(int i = 1; i < 11; i ++){
            String message = "info" + i;
            // 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
        }
    }
}
  • 我们将对应的设置过期时间注释掉

4. 测试结果

  • 如果我们启动消费者01会报错,那是因为我们所创建的队列已经存在,我们需要把普通队列删除,因为只有它的参数发生了改变

在这里插入图片描述

  • 因为我们设置了普通队列的最大长度6,所以当超过了最大长度的消息都会被作为死信

1.3.4、消息被拒情况

1. 消费者01

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

import java.util.HashMap;
import java.util.Map;

public class Consumer01 {
    public static final String DEAD_EXCHANGE = "dead_exchange";

    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static final String DEAD_QUEUE = "dead_queue";

    public static final String NORMAL_QUEUE = "normal_queue";

    /**
     * 死信实战
     * 消费者01
     * @param args
     * @throws Exception
     */

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();
        // 死信交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 普通交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        map.put("x-dead-letter-routing-key","lisi");
        //map.put("x-max-length",6);
        //map.put("x-message-ttl",10000);

        // 普通队列
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
        // 死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        // 队列绑定
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (var1, var2)->{
            String msg = new String(var2.getBody(),"UTF-8");
            if(msg.equals("info5")){
                System.out.println("Consumer01控制台接收到的消息是:" + msg + ": 此消息被拒" );
                channel.basicReject(var2.getEnvelope().getDeliveryTag(),false);
            }else{
                System.out.println("Consumer01控制台接收到的消息是:" + msg);
                channel.basicAck(var2.getEnvelope().getDeliveryTag(),false);
            }

        };
        channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,var1->{});

    }
}

  • 这里我们将队列最大长度注释掉
  • 我们还需要开启手动应答,因为不开启就不会存在消息被拒 的问题。

2. 消费者02

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

import java.util.HashMap;
import java.util.Map;

public class Consumer02 {
    /**
     * 消费者02
     */

    public static final String DEAD_QUEUE = "dead_queue";


    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();

        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println("Consumer02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
        };
        // 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});

    }
}
  • 消费者02和队列达到最大长度情况下的一模一样

3. 生产者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;

import java.nio.charset.StandardCharsets;

public class Producer {
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();


        // 单位是毫秒
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        for(int i = 1; i < 11; i ++){
            String message = "info" + i;
            // 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
        }
    }
}
  • 生产者和队列达到最大长度情况下的一模一样

4. 测试结果

  • 测试之前我们需要将队列中的消息消费掉,并且需要将普通队列删除。

在这里插入图片描述

在这里插入图片描述

  • 可见只有"info5"被作为死信。
相关文章
|
8天前
|
数据采集 JSON 数据可视化
JSON数据解析实战:从嵌套结构到结构化表格
在信息爆炸的时代,从杂乱数据中提取精准知识图谱是数据侦探的挑战。本文以Google Scholar为例,解析嵌套JSON数据,提取文献信息并转换为结构化表格,通过Graphviz制作技术关系图谱,揭示文献间的隐秘联系。代码涵盖代理IP、请求头设置、JSON解析及可视化,提供完整实战案例。
JSON数据解析实战:从嵌套结构到结构化表格
|
15天前
|
数据可视化 测试技术 API
GraphQL开发工具选型指南:Apipost高效调试与文档生成实战解析
本文深入解析了GraphQL开发工具Apipost在高效调试与文档生成方面的优势,对比同类工具Apifox,突出其可视化界面、实时调试及自动化文档生成等特性。Apipost通过智能代码补全、错误提示等功能简化复杂Query编写,支持一键生成标准化文档,显著提升开发效率和团队协作效果,尤其适合中大型团队应对复杂业务场景。
|
1月前
|
数据采集 Web App开发 JavaScript
DOMParser解析TikTok页面中的图片元素
DOMParser解析TikTok页面中的图片元素
|
2月前
|
运维 Shell 数据库
Python执行Shell命令并获取结果:深入解析与实战
通过以上内容,开发者可以在实际项目中灵活应用Python执行Shell命令,实现各种自动化任务,提高开发和运维效率。
80 20
|
2月前
|
供应链 搜索推荐 API
深度解析1688 API对电商的影响与实战应用
在全球电子商务迅猛发展的背景下,1688作为知名的B2B电商平台,为中小企业提供商品批发、分销、供应链管理等一站式服务,并通过开放的API接口,为开发者和电商企业提供数据资源和功能支持。本文将深入解析1688 API的功能(如商品搜索、详情、订单管理等)、应用场景(如商品展示、搜索优化、交易管理和用户行为分析)、收益分析(如流量增长、销售提升、库存优化和成本降低)及实际案例,帮助电商从业者提升运营效率和商业收益。
252 20
|
2月前
|
数据采集 XML API
深入解析BeautifulSoup:从sohu.com视频页面提取关键信息的实战技巧
深入解析BeautifulSoup:从sohu.com视频页面提取关键信息的实战技巧
|
2月前
|
存储 运维 负载均衡
Hologres 查询队列全面解析
Hologres V3.0引入查询队列功能,实现请求有序处理、负载均衡和资源管理,特别适用于高并发场景。该功能通过智能分类和调度,确保复杂查询不会垄断资源,保障系统稳定性和响应效率。在电商等实时业务中,查询队列优化了数据写入和查询处理,支持高效批量任务,并具备自动流控、隔离与熔断机制,确保核心业务不受干扰,提升整体性能。
87 11
|
3月前
|
人工智能 搜索推荐 API
Cobalt:开源的流媒体下载工具,支持解析和下载全平台的视频、音频和图片,支持多种视频质量和格式,自动提取视频字幕
cobalt 是一款开源的流媒体下载工具,支持全平台视频、音频和图片下载,提供纯净、简洁无广告的体验
484 9
Cobalt:开源的流媒体下载工具,支持解析和下载全平台的视频、音频和图片,支持多种视频质量和格式,自动提取视频字幕
|
3月前
|
安全 API 数据安全/隐私保护
速卖通AliExpress商品详情API接口深度解析与实战应用
速卖通(AliExpress)作为全球化电商的重要平台,提供了丰富的商品资源和便捷的购物体验。为了提升用户体验和优化商品管理,速卖通开放了API接口,其中商品详情API尤为关键。本文介绍如何获取API密钥、调用商品详情API接口,并处理API响应数据,帮助开发者和商家高效利用这些工具。通过合理规划API调用策略和确保合法合规使用,开发者可以更好地获取商品信息,优化管理和营销策略。
|
3月前
|
自然语言处理 搜索推荐 数据安全/隐私保护
鸿蒙登录页面好看的样式设计-HarmonyOS应用开发实战与ArkTS代码解析【HarmonyOS 5.0(Next)】
鸿蒙登录页面设计展示了 HarmonyOS 5.0(Next)的未来美学理念,结合科技与艺术,为用户带来视觉盛宴。该页面使用 ArkTS 开发,支持个性化定制和无缝智能设备连接。代码解析涵盖了声明式 UI、状态管理、事件处理及路由导航等关键概念,帮助开发者快速上手 HarmonyOS 应用开发。通过这段代码,开发者可以了解如何构建交互式界面并实现跨设备协同工作,推动智能生态的发展。
232 10
鸿蒙登录页面好看的样式设计-HarmonyOS应用开发实战与ArkTS代码解析【HarmonyOS 5.0(Next)】

推荐镜像

更多