108.【RabbitsMQ】(十)

简介: 108.【RabbitsMQ】

3.死信队列 (接盘侠) DLX

(1).概述

支持队列TTL,不支持消息TTL.

DLX,全称 Dead-Letter-Exchange,可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX,绑定 DLX的队列就称之为死信队列。消息变成死信,可能是由于以下原因:

  1. 消息被拒绝
  2. 消息过期
  3. 队列达到最大长度

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性,当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的 DLX上去,进而被路由到另一个队列,即死信队列。

要想使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange指定交换机即可.

(2).生产者

正常生产者

这里我们只负责正常生产者: 配置我们正常的非死信路由key

package com.jsxs.service;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.UUID;
/**
 * @Author Jsxs
 * @Date 2023/4/2 11:24
 * @PackageName:com.jsxs.service
 * @ClassName: OrderService
 * @Description: TODO :  分别给对应的路由Key发送消息.
 * @Version 1.0
 */
@Service
public class OrderService {
    @Resource   //  获取rabbitMQ的服务
    private RabbitTemplate rabbitTemplate;
    /**
     *
     * @param userId
     * @param productID
     * @param num
     */
   public void makeOrder(String userId,String productID,int num){
       //1. 生成订单
       String orderID = UUID.randomUUID().toString().replace("-","");
       System.out.println("订单号已经生产成功-"+orderID);
       //2. 设置交换机名字和路由
       String exchangeName="ttl_message_order_producer";
       //3. 发送消息
       // 参数: (交换机、路由key或队列名、消息内容)
       rabbitTemplate.convertAndSend(exchangeName,"one","1");
       rabbitTemplate.convertAndSend(exchangeName,"two","2");
       rabbitTemplate.convertAndSend(exchangeName,"three","3");
       rabbitTemplate.convertAndSend(exchangeName,"four","4");
   }
}
(3).配置文件-(非死信配置)

我们需要添加死信的交换机与死信的路由key。将非死信交换机与死信交换机做一个连接的操作。

package com.jsxs.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @Author Jsxs
 * @Date 2023/4/3 16:16
 * @PackageName:com.jsxs.config
 * @ClassName: TTLRabbitMQConfig
 * @Description: TODO
 * @Version 1.0
 */
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import java.util.HashMap;
@Configuration
public class TTLRabbitMQConfig {
        // 1. 声明注册direct模式的交换机
    @Bean
    public DirectExchange directExchange(){
        //  (交换机的名字、是否持久化。是否自动删除)
        return new DirectExchange("ttl_message_order_producer",true,false);
    }
        // 2. 声明队列: 以及过期时间
    @Bean
    public Queue SmsQueue(){
        HashMap<String, Object> args = new HashMap<>();
        //  下面的key需要向web界面去寻找....
        // 设置队列的过期时间
        args.put("x-message-ttl",5000);
        // 死信交换机
        args.put("x-dead-letter-exchange","dead_order_producer");
        // 死信路由key
        args.put("x-dead-letter-routing-key","dead_sms");  // fanout 不需要配置路由key
        return new Queue("sms.ttl.queue",true,false,false,args);
    }
        @Bean
    public Queue MessageQueue(){
            HashMap<String, Object> args = new HashMap<>();
            //  下面的key需要向web界面去寻找....
            // 设置队列的过期时间
            args.put("x-message-ttl",5000);
            // 死信交换机
            args.put("x-dead-letter-exchange","dead_order_producer");
            // 死信路由key
            args.put("x-dead-letter-routing-key","dead_message");  // fanout 不需要配置路由key
        return new Queue("message.ttl.queue",true,false,false,args);
    }
    //  ---------------上面我们同时设置队列过期时间和消息过期时间-----下面我们设置仅消息过期时间
    @Bean
    public Queue EmailQueue(){
        HashMap<String, Object> args = new HashMap<>();
        //  下面的key需要向web界面去寻找....
        // 设置队列的过期时间
        args.put("x-message-ttl",5000);
        // 死信交换机
        args.put("x-dead-letter-exchange","dead_order_producer");
        // 死信路由key
        args.put("x-dead-letter-routing-key","dead_email");  // fanout 不需要配置路由key
        return new Queue("email.ttl.queue",true,false,false,args);
    }
    @Bean
    public Queue WeChatQueue(){
        HashMap<String, Object> args = new HashMap<>();
        //  下面的key需要向web界面去寻找....
        // 设置队列的过期时间
        args.put("x-message-ttl",5000);
        // 死信交换机
        args.put("x-dead-letter-exchange","dead_order_producer");
        // 死信路由key
        args.put("x-dead-letter-routing-key","dead_wechat");  // fanout 不需要配置路由key
        return new Queue("wechat.ttl.queue",true,false,false,args);
    }
        // 3. 将队列与交换机进行绑定的操作
    @Bean
    public Binding SmsBind(){
        return BindingBuilder.bind(SmsQueue()).to(directExchange()).with("one");
    }
    @Bean
    public Binding MessageBind(){
        return BindingBuilder.bind(MessageQueue()).to(directExchange()).with("two");
    }
    @Bean
    public Binding EmailBind(){
        return BindingBuilder.bind(EmailQueue()).to(directExchange()).with("three");
    }
    @Bean
    public Binding WechatBind(){
        return BindingBuilder.bind(WeChatQueue()).to(directExchange()).with("four");
    }
}
(4).配置文件-(死信配置)
package com.jsxs.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
 *      死信队列配置
 */
@Configuration
public class DeadRabbitMQConfig {
        // 1. 声明注册direct模式的交换机
    @Bean
    public DirectExchange deadDirectExchange(){
        //  (交换机的名字、是否持久化。是否自动删除)
        return new DirectExchange("dead_order_producer",true,false);
    }
        // 2. 声明队列: 以及过期时间
    @Bean
    public Queue deadSmsQueue(){
        return new Queue("dead.sms.queue",true);
    }
        @Bean
    public Queue deadMessageQueue(){
        return new Queue("dead.message.queue",true);
    }
    @Bean
    public Queue deadEmailQueue(){
        return new Queue("dead.email.queue",true);
    }
    @Bean
    public Queue deadWeChatQueue(){
        return new Queue("dead.wechat.queue",true);
    }
    // 3. 将队列与交换机进行绑定的操作
    @Bean
    public Binding deadSmsBind(){
        return BindingBuilder.bind(deadSmsQueue()).to(deadDirectExchange()).with("dead_sms");
    }
    @Bean
    public Binding deadMessageBind(){
        return BindingBuilder.bind(deadMessageQueue()).to(deadDirectExchange()).with("dead_message");
    }
    @Bean
    public Binding deadEmailBind(){
        return BindingBuilder.bind(deadEmailQueue()).to(deadDirectExchange()).with("dead_email");
    }
    @Bean
    public Binding deadWechatBind(){
        return BindingBuilder.bind(deadWeChatQueue()).to(deadDirectExchange()).with("dead_wechat");
    }
}
(5).消费者

email

package com.jsxs.service.ttl;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
 * @Author Jsxs
 * @Date 2023/4/2 13:45
 * @PackageName:com.jsxs.service.faout
 * @ClassName: EmailConsumer
 * @Description: TODO
 * @Version 1.0
 */
@Service
@RabbitListener(queues = {"email.ttl.queue"})  // 这个客户端的队列是哪个?
public class EmailConsumer {
    @RabbitHandler // 接收到的消息放在这
    public void receiveMessage(String message){
        System.out.println("email接收到的信息是:->"+message);
    }
}

message

package com.jsxs.service.ttl;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
 * @Author Jsxs
 * @Date 2023/4/2 13:44
 * @PackageName:com.jsxs.service.faout
 * @ClassName: MessageConsumer
 * @Description: TODO
 * @Version 1.0
 */
@Service
@RabbitListener(queues = {"message.ttl.queue"})  // 这个客户端的队列是哪个?
public class MessageConsumer {
    @RabbitHandler // 接收到的消息放在这
    public void receiveMessage(String message){
        System.out.println("Message接收到的信息是:->"+message);
    }
}

sms

package com.jsxs.service.ttl;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
 * @Author Jsxs
 * @Date 2023/4/2 13:44
 * @PackageName:com.jsxs.service.faout
 * @ClassName: SmsConsumer
 * @Description: TODO
 * @Version 1.0
 */
@Service
@RabbitListener(queues = {"sms.ttl.queue"})  // 这个客户端的队列是哪个?
public class SmsConsumer {
    @RabbitHandler // 接收到的消息放在这
    public void receiveMessage(String message){
        System.out.println("sms接收到的信息是:->"+message);
    }
}

wechat

package com.jsxs.service.ttl;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
 * @Author Jsxs
 * @Date 2023/4/2 13:44
 * @PackageName:com.jsxs.service.faout
 * @ClassName: WechatConsumer
 * @Description: TODO
 * @Version 1.0
 */
@Service
@RabbitListener(queues = {"wechat.ttl.queue"})  // 这个客户端的队列是哪个?
public class WechatConsumer {
    @RabbitHandler // 接收到的消息放在这
    public void receiveMessage(String message){
        System.out.println("wechat接收到的信息是:->"+message);
    }
}

在我们的有效期内会在正常的交换机中

时间过期或者长度上限会进入我们的死信队列

在有效期内,我们消费者能够进行正常的消费...

4. 内存磁盘的监控

(1).RabbitMQ内存警告

当内存使用超过配置或者磁盘空间对于配置的阀值时,RabbitMQ会暂时阻塞客户端发来的消息,以此避免服务器的崩溃,客户端与服务端的心态检测机制也会失效。

(2).RabbitMQ的内存控制

参考帮助文档:https://rabbitmq.com/configure.html

当出现警告的时候,可以通过配置去修改和调整

命令的方式

下面的方式我们选择其一就行了,不是全部选举

rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute 50MB

fraction/value 为内存阈值。默认情况是:0.4/2GB,代表的含义是:当 RabbitMQ的内存超过40%时,就会产生警告并且会阻塞所有生产者的连接。通过此命令修改阈值在 Broker重启以后将会失效,通过修改配置文件设置的阈值则不会随着重启而消失,但修改了配置文件一样要重启 Broker才会生效.

配置文件Rabbitmq.conf

#默认 : /etc/rabbitmq/rabbitmq.conf  ->手动安装
vm_memory_high_watermark.relative=0.4
#使用relative相对值设置fraction,建议在0.4-0.7之间
vm_memory_high_watermark.absolute=2GB
(3).RabbitMQ的内存换页

在某个Broker节点及内存阻赛生产者之前,它会尝试将队列中的消息换页到碰盘以释放内存空间,持久化和非持久化的消息都会写入磁盘中,其中持久化的消息本身就在碰盘中有一个副本,所以在转移的过程中持久化的消息会先从内存中清除掉。

默认情况下,内存到达的阔值是50%时就会换页处理。也就是说,在默认情况下该内存的闻值是0.4的情况下,当内存超过0.4*0.5=0.2时,会进行换页动作,

比如有1000MB内存,当内存的使用率达到了400MB,已经达到了极限,但是因为配置的换页内存0.5,这个时候会在达到极限400mb之前,会把内存中的200MB进行转移到磁盘中,从而达到稳健的运行,

可以通过设置 vm_memory_high_watermark_paging_ratio 来进行调整

vm_memory_high_watermark.relative=0.4
vm_memory_high_watermark_paging_ratio=0.7 (小于1)

因为我们设置1,整个电脑的内存已经全部属于我们的RabbitMQ了,所以在设置分页已经没有什么意义了。

(4).RabbitMQ的磁盘预警

当磁盘的剩余空间低于确定的成值时,RabbitMQ同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽E盘空间导致服务器崩清。

默认情况下:磁盘预警为50MB的时候会进行预警。表示当前磁盘空间第50MB的时候会阳塞生产者并且停止内存消息换页到磁盘的过程。

这个闻值可以减小,但是不能完全的消除因磁盘耗尽而导致崩溃的可能性。比如在两次磁盘空间的检查空隙内,第次检查是:60MB,第二检查可能就是1MB,就会出现警告。

通过命令方式进行修改

rabbitmqctl set_disk_free_limit <disk_limit>
rabbitmqctl set_disk_free_limit memory_limit <fraction>

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
负载均衡 应用服务中间件 nginx
5分钟搞懂Ingress / IngressController / IngressClass的区别
先来个一句话总结:Ingress由Ingress规则、IngressController、IngressClass这3部分组成。Ingress资源只是一系列路由转发配置,必须使用IngressController才能让路由规则生效,而IngressClass是IngressController的具体实现。使用原则:先部署IngressController → 再部署Ingress资源。
21380 0
5分钟搞懂Ingress / IngressController / IngressClass的区别
|
Rust 安全 前端开发
为什么 Rust 备受开发者青睐?
在本篇文章中,作者介绍了 Rust 是什么,它的历史以及 Rust 是如何备受开发者和行业的青睐。希望本篇文章能帮助读者对 Rust 这门语言有一个大概的了解。
137707 43
Hertz中的CORS问题。
字节跳动开源框架Hertz,可能存在的CORS的跨域问题
|
Android开发 数据格式 XML
Android FrameLayout子view居中(左居中,右居中)等
Android的布局FrameLayout默认是把布局内的子view堆砌在左上角,但是,可以通过设置子view的: android:layout_gravity 此参数控制子view的布局位置,实现FrameLayou...
2422 0
|
Linux 数据安全/隐私保护
百度搜索:蓝易云【Centos7系统路由追踪安装使用教程。】
CentOS 7是一种常用的Linux操作系统,它具有广泛的应用和用户群体。路由追踪是一种网络工具,用于确定到达目标主机的网络数据包路径。
138 0
|
人工智能 开发者
万物皆可AIGC,免费算力等你来
人人都可以玩转AIGC! 本次活动广泛征集运用阿里云产品进行AIGC创作的作品,无论你是小白还是资深开发者,都可以来活动页领取免费算力,根据教程或任意组合阿里云免费资源进行AIGC创作,云端释放无限创意!
43633 189
|
算法 架构师 安全
需求分析和常见的需求问题解决
需求分析和常见的需求问题解决
110995 17
|
存储 Java 数据库
zookeeper深入浅出 2
zookeeper深入浅出
78 0
|
Java 数据库连接 数据库
Spring 应用如何访问数据库,看这一篇就够了!
当我们开发应用时,访问数据库是一种常见的需求。 基本上所有需要持久化的数据,一般都存储在数据库中,例如常用的开源数据库 MySQL。 在今天的文章中,我将盘点一下 Java 应用访问数据的几种方式。
34182 9
|
Ubuntu NoSQL Linux
一文讲明Docker的基本使用,常见Docker命令使用 、Docker的安装使用等
1、Docker的基本概念 2、常用的Docker命令 3、虚拟机安装Docker