3.死信队列 (接盘侠) DLX
(1).概述
支持队列TTL,不支持消息TTL.
DLX,全称 Dead-Letter-Exchange
,可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX,绑定 DLX的队列就称之为死信队列。消息变成死信,可能是由于以下原因:
- 消息被拒绝
- 消息过期
- 队列达到最大长度
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性,当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的 DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange
指定交换机即可.
(2).生产者
正常生产者
这里我们只负责正常生产者: 配置我们正常的非死信路由key
package com.jsxs.service; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.UUID; /** * @Author Jsxs * @Date 2023/4/2 11:24 * @PackageName:com.jsxs.service * @ClassName: OrderService * @Description: TODO : 分别给对应的路由Key发送消息. * @Version 1.0 */ @Service public class OrderService { @Resource // 获取rabbitMQ的服务 private RabbitTemplate rabbitTemplate; /** * * @param userId * @param productID * @param num */ public void makeOrder(String userId,String productID,int num){ //1. 生成订单 String orderID = UUID.randomUUID().toString().replace("-",""); System.out.println("订单号已经生产成功-"+orderID); //2. 设置交换机名字和路由 String exchangeName="ttl_message_order_producer"; //3. 发送消息 // 参数: (交换机、路由key或队列名、消息内容) rabbitTemplate.convertAndSend(exchangeName,"one","1"); rabbitTemplate.convertAndSend(exchangeName,"two","2"); rabbitTemplate.convertAndSend(exchangeName,"three","3"); rabbitTemplate.convertAndSend(exchangeName,"four","4"); } }
(3).配置文件-(非死信配置)
我们需要添加死信的交换机与死信的路由key。将非死信交换机与死信交换机做一个连接的操作。
package com.jsxs.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author Jsxs * @Date 2023/4/3 16:16 * @PackageName:com.jsxs.config * @ClassName: TTLRabbitMQConfig * @Description: TODO * @Version 1.0 */ import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import java.util.HashMap; @Configuration public class TTLRabbitMQConfig { // 1. 声明注册direct模式的交换机 @Bean public DirectExchange directExchange(){ // (交换机的名字、是否持久化。是否自动删除) return new DirectExchange("ttl_message_order_producer",true,false); } // 2. 声明队列: 以及过期时间 @Bean public Queue SmsQueue(){ HashMap<String, Object> args = new HashMap<>(); // 下面的key需要向web界面去寻找.... // 设置队列的过期时间 args.put("x-message-ttl",5000); // 死信交换机 args.put("x-dead-letter-exchange","dead_order_producer"); // 死信路由key args.put("x-dead-letter-routing-key","dead_sms"); // fanout 不需要配置路由key return new Queue("sms.ttl.queue",true,false,false,args); } @Bean public Queue MessageQueue(){ HashMap<String, Object> args = new HashMap<>(); // 下面的key需要向web界面去寻找.... // 设置队列的过期时间 args.put("x-message-ttl",5000); // 死信交换机 args.put("x-dead-letter-exchange","dead_order_producer"); // 死信路由key args.put("x-dead-letter-routing-key","dead_message"); // fanout 不需要配置路由key return new Queue("message.ttl.queue",true,false,false,args); } // ---------------上面我们同时设置队列过期时间和消息过期时间-----下面我们设置仅消息过期时间 @Bean public Queue EmailQueue(){ HashMap<String, Object> args = new HashMap<>(); // 下面的key需要向web界面去寻找.... // 设置队列的过期时间 args.put("x-message-ttl",5000); // 死信交换机 args.put("x-dead-letter-exchange","dead_order_producer"); // 死信路由key args.put("x-dead-letter-routing-key","dead_email"); // fanout 不需要配置路由key return new Queue("email.ttl.queue",true,false,false,args); } @Bean public Queue WeChatQueue(){ HashMap<String, Object> args = new HashMap<>(); // 下面的key需要向web界面去寻找.... // 设置队列的过期时间 args.put("x-message-ttl",5000); // 死信交换机 args.put("x-dead-letter-exchange","dead_order_producer"); // 死信路由key args.put("x-dead-letter-routing-key","dead_wechat"); // fanout 不需要配置路由key return new Queue("wechat.ttl.queue",true,false,false,args); } // 3. 将队列与交换机进行绑定的操作 @Bean public Binding SmsBind(){ return BindingBuilder.bind(SmsQueue()).to(directExchange()).with("one"); } @Bean public Binding MessageBind(){ return BindingBuilder.bind(MessageQueue()).to(directExchange()).with("two"); } @Bean public Binding EmailBind(){ return BindingBuilder.bind(EmailQueue()).to(directExchange()).with("three"); } @Bean public Binding WechatBind(){ return BindingBuilder.bind(WeChatQueue()).to(directExchange()).with("four"); } }
(4).配置文件-(死信配置)
package com.jsxs.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; /** * 死信队列配置 */ @Configuration public class DeadRabbitMQConfig { // 1. 声明注册direct模式的交换机 @Bean public DirectExchange deadDirectExchange(){ // (交换机的名字、是否持久化。是否自动删除) return new DirectExchange("dead_order_producer",true,false); } // 2. 声明队列: 以及过期时间 @Bean public Queue deadSmsQueue(){ return new Queue("dead.sms.queue",true); } @Bean public Queue deadMessageQueue(){ return new Queue("dead.message.queue",true); } @Bean public Queue deadEmailQueue(){ return new Queue("dead.email.queue",true); } @Bean public Queue deadWeChatQueue(){ return new Queue("dead.wechat.queue",true); } // 3. 将队列与交换机进行绑定的操作 @Bean public Binding deadSmsBind(){ return BindingBuilder.bind(deadSmsQueue()).to(deadDirectExchange()).with("dead_sms"); } @Bean public Binding deadMessageBind(){ return BindingBuilder.bind(deadMessageQueue()).to(deadDirectExchange()).with("dead_message"); } @Bean public Binding deadEmailBind(){ return BindingBuilder.bind(deadEmailQueue()).to(deadDirectExchange()).with("dead_email"); } @Bean public Binding deadWechatBind(){ return BindingBuilder.bind(deadWeChatQueue()).to(deadDirectExchange()).with("dead_wechat"); } }
(5).消费者
package com.jsxs.service.ttl; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * @Author Jsxs * @Date 2023/4/2 13:45 * @PackageName:com.jsxs.service.faout * @ClassName: EmailConsumer * @Description: TODO * @Version 1.0 */ @Service @RabbitListener(queues = {"email.ttl.queue"}) // 这个客户端的队列是哪个? public class EmailConsumer { @RabbitHandler // 接收到的消息放在这 public void receiveMessage(String message){ System.out.println("email接收到的信息是:->"+message); } }
message
package com.jsxs.service.ttl; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * @Author Jsxs * @Date 2023/4/2 13:44 * @PackageName:com.jsxs.service.faout * @ClassName: MessageConsumer * @Description: TODO * @Version 1.0 */ @Service @RabbitListener(queues = {"message.ttl.queue"}) // 这个客户端的队列是哪个? public class MessageConsumer { @RabbitHandler // 接收到的消息放在这 public void receiveMessage(String message){ System.out.println("Message接收到的信息是:->"+message); } }
sms
package com.jsxs.service.ttl; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * @Author Jsxs * @Date 2023/4/2 13:44 * @PackageName:com.jsxs.service.faout * @ClassName: SmsConsumer * @Description: TODO * @Version 1.0 */ @Service @RabbitListener(queues = {"sms.ttl.queue"}) // 这个客户端的队列是哪个? public class SmsConsumer { @RabbitHandler // 接收到的消息放在这 public void receiveMessage(String message){ System.out.println("sms接收到的信息是:->"+message); } }
package com.jsxs.service.ttl; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * @Author Jsxs * @Date 2023/4/2 13:44 * @PackageName:com.jsxs.service.faout * @ClassName: WechatConsumer * @Description: TODO * @Version 1.0 */ @Service @RabbitListener(queues = {"wechat.ttl.queue"}) // 这个客户端的队列是哪个? public class WechatConsumer { @RabbitHandler // 接收到的消息放在这 public void receiveMessage(String message){ System.out.println("wechat接收到的信息是:->"+message); } }
在我们的有效期内会在正常的交换机中
时间过期或者长度上限会进入我们的死信队列
在有效期内,我们消费者能够进行正常的消费...
4. 内存磁盘的监控
(1).RabbitMQ内存警告
当内存使用超过配置或者磁盘空间对于配置的阀值时,RabbitMQ会暂时阻塞客户端发来的消息,以此避免服务器的崩溃,客户端与服务端的心态检测机制也会失效。
(2).RabbitMQ的内存控制
参考帮助文档:https://rabbitmq.com/configure.html
当出现警告的时候,可以通过配置去修改和调整
命令的方式
下面的方式我们选择其一就行了,不是全部选举
rabbitmqctl set_vm_memory_high_watermark <fraction> rabbitmqctl set_vm_memory_high_watermark absolute 50MB
fraction/value 为内存阈值。默认情况是:0.4/2GB,代表的含义是:当 RabbitMQ的内存超过40%时,就会产生警告并且会阻塞所有生产者的连接。通过此命令修改阈值在 Broker重启以后将会失效,通过修改配置文件设置的阈值则不会随着重启而消失,但修改了配置文件一样要重启 Broker才会生效.
配置文件Rabbitmq.conf
#默认 : /etc/rabbitmq/rabbitmq.conf ->手动安装 vm_memory_high_watermark.relative=0.4 #使用relative相对值设置fraction,建议在0.4-0.7之间 vm_memory_high_watermark.absolute=2GB
(3).RabbitMQ的内存换页
在某个Broker节点及内存阻赛生产者之前,它会尝试将队列中的消息换页到碰盘以释放内存空间,持久化和非持久化的消息都会写入磁盘中,其中持久化的消息本身就在碰盘中有一个副本,所以在转移的过程中持久化的消息会先从内存中清除掉。
默认情况下,内存到达的阔值是50%时就会换页处理。也就是说,在默认情况下该内存的闻值是0.4的情况下,当内存超过0.4*0.5=0.2时,会进行换页动作,
比如有1000MB内存,当内存的使用率达到了400MB,已经达到了极限,但是因为配置的换页内存0.5,这个时候会在达到极限400mb之前,会把内存中的200MB进行转移到磁盘中,从而达到稳健的运行,
可以通过设置 vm_memory_high_watermark_paging_ratio
来进行调整
vm_memory_high_watermark.relative=0.4 vm_memory_high_watermark_paging_ratio=0.7 (小于1)
因为我们设置1,整个电脑的内存已经全部属于我们的RabbitMQ了,所以在设置分页已经没有什么意义了。
(4).RabbitMQ的磁盘预警
当磁盘的剩余空间低于确定的成值时,RabbitMQ同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽E盘空间导致服务器崩清。
默认情况下:磁盘预警为50MB的时候会进行预警。表示当前磁盘空间第50MB的时候会阳塞生产者并且停止内存消息换页到磁盘的过程。
这个闻值可以减小,但是不能完全的消除因磁盘耗尽而导致崩溃的可能性。比如在两次磁盘空间的检查空隙内,第次检查是:60MB,第二检查可能就是1MB,就会出现警告。
通过命令方式进行修改
rabbitmqctl set_disk_free_limit <disk_limit> rabbitmqctl set_disk_free_limit memory_limit <fraction>