【消息队列】消息中间件RabbitMQ急速入门3

简介: 【消息队列】消息中间件RabbitMQ急速入门

6.RabbitMQ死信队列

6.1.什么是TTL

  • time to live消息存活时间
  • 如果消息在存活时间内未被消费,就会清除
  • RabbitMQ支持两种TTL
  • 单独消息进行配置ttl
  • 个队列进行配置ttl

6.2.什么是RabbitMQ的死信队列

  • 没有被及时消费的消息存放的队列

6.3.什么是RabbitMQ的死信交换机

  • Dead Letter Exchange(死信交换机,缩写:DLX)当消息过期未被消费后,会通过死信交换机,转发到死信队列,这个交换机就是DLX死信交换机

5efeea966a0a421faf2ea2ef97fcd815.jpegv

6.4.消息有哪几种情况成为死信

  • 消费者拒收消息(basicNack或者basicReject),并且没有重新入队requeue=false
  • 消息在队列中未被消费,且到了过期时间的消息(TTL)
  • 队列长度达到极限后,如果绑定死信交换机和死信队列就会被投放到死信队列

6.5.什么是延迟队列

  • 一种带有延迟功能的消息队列,Producer将消息发送到消息队列的服务端,但并不希望该条消息立马投递,而是推迟到当前时间的之后的一个时间Consumer进行消费,也可以叫定时消息

使用场景

  • 通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息
  • 用户登录之后5分钟给用户做分类推送、用户多少天未登录给用户做召回推送
  • 订单超时未支付关闭订单场景

业界一些实现方式

  • 定时任务高精度轮回
  • 采用RocketMQ自带延迟消息功能
  • RabbitMQ结合死信交换机死信队列做到延迟消息

6.6.死信队列+死信交换机实现延迟消息

e31b257727c040e5867d875e248764e2.jpeg

/**
 * mq配置类
 * 新商家上架->new_merchant_queue->死信交换机->死信队列
 */
@Configuration
public class RabbitMQConfig {
    /**
     * 死信队列
     */
    public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";
    /**
     * 死信交换机
     */
    public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";
    /**
     * 进入死信队列的路由key
     */
    public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key";
    /**
     * 创建死信交换机
     * @return
     */
    @Bean
    public Exchange lockMerchantDeadExchange(){
        return new TopicExchange(LOCK_MERCHANT_DEAD_EXCHANGE,true,false);
    }
    /**
     * 创建死信队列
     * @return
     */
    @Bean
    public Queue lockMerchantDeadQueue(){
        return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();
    }
    /**
     * 绑定死信交换机和死信队列
     * @return
     */
    @Bean
    public Binding lockMerchantBinding(){
        return new Binding(LOCK_MERCHANT_DEAD_QUEUE,Binding.DestinationType.QUEUE,
                LOCK_MERCHANT_DEAD_EXCHANGE,LOCK_MERCHANT_ROUTING_KEY,null);
    }
    /**
     * 普通队列,绑定的个死信交换机
     */
    public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue";
    /**
     * 普通的topic交换机
     */
    public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";
    /**
     * 路由key
     */
    public static final String NEW_MERCHANT_ROUTIING_KEY = "new_merchant_routing_key";
    /**
     * 创建普通交换机
     * @return
     */
    @Bean
    public Exchange newMerchantExchange(){
        return new TopicExchange(NEW_MERCHANT_EXCHANGE,true,false);
    }
    /**
     * 创建普通队列
     * @return
     */
    @Bean
    public Queue newMerchantQueue(){
        Map<String,Object> args = new HashMap<>(3);
        //消息过期后,进入到死信交换机
        args.put("x-dead-letter-exchange",LOCK_MERCHANT_DEAD_EXCHANGE);
        //消息过期后,进入到死信交换机的路由key
        args.put("x-dead-letter-routing-key",LOCK_MERCHANT_ROUTING_KEY);
        //过期时间,单位毫秒
        args.put("x-message-ttl",20000);
        return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();
    }
    /**
     * 绑定交换机和队列
     * @return
     */
    @Bean
    public Binding newMerchantBinding(){
        return new Binding(NEW_MERCHANT_QUEUE,Binding.DestinationType.QUEUE,
                NEW_MERCHANT_EXCHANGE,NEW_MERCHANT_ROUTIING_KEY,null);
    }
}
@Component
@RabbitListener(queues = RabbitMQConfig.LOCK_MERCHANT_DEAD_QUEUE)
public class OrderMQListener {
    /**
     * 处理器,适配器,加上@RabbitHandler注解
     */
    @RabbitHandler
    public void messageHandler(String body, Message message, Channel channel) throws IOException {
        long tag = message.getMessageProperties().getDeliveryTag();
        System.out.println("message:"+message.toString());
        System.out.println("==============");
        System.out.println("消息标识tag:"+tag);
        System.out.println("消息体body:"+body);
        //如果发生异常就重新入队
        try{
            channel.basicAck(tag,false);
        }catch (Exception e){
            //拒收消息,重新入队
            channel.basicNack(tag,false,true);
        }
    }
}

7.RabbitMQ高可用集群模式

7.1.RabbitMQ集群模式介绍

普通集群

  • 默认的集群模式,普通集群模式下同步交换机、队列、虚拟主机元数据,不同步消息信息,当消费者去B节点访问数据,数据在A节点,这时A节点先把数据转发给B节点,消费者在从B节点中pull消息。
  • 存在问题:假如存在消息的节点宕机了,那么消费者想要消费这个消息,就要等当前节点恢复后才可以恢复正常,如果没有做消息持久化,则消息会丢失

镜像集群

  • 队列做成镜像队列,让队列存在于各个节点中,和普通集群比较大的区别就是queue的message在各个节点之间同步,并不是在consumer获取时拉去,转发。
  • 存在问题:由于镜像队列模式下,消息数量过去,大量的消息同步也会加大网络带宽的开销,适合高可用的项目,多节点性能会收到影响。

注意:集群需要保证各个节点有相同的token令牌,集群内各个节点的erlang.cookie需要相同,才可以相互通信

a8570ae761bb446390715cada140207d.jpeg

7.2.RabbitMQ搭建普通集群

准备三个mq节点

#节点一,主节点,创建-v映射目录
docker run -d --hostname rabbit_host1 --name rabbitmq1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168  -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/1/log:/var/log/rabbitmq rabbitmq:management
#节点二,创建-v映射目录
docker run -d --hostname rabbit_host2 --name rabbitmq2  -p 15673:15672 -p 5673:5672 --link rabbitmq1:rabbit_host1 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/2/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/2/log:/var/log/rabbitmq rabbitmq:management
#节点三,创建-v映射目录
docker run -d --hostname rabbit_host3 --name rabbitmq3 -p 15674:15672 -p 5674:5672 --link rabbitmq1:rabbit_host1 --link rabbitmq2:rabbit_host2 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/3/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/3/log:/var/log/rabbitmq rabbitmq:management

参数说明:

--hostname:自定义Docker容器的hostname
--link:容器之间连接,link不可缺,使得三个容器相互通信
--privileged=true:使用该参数,container内的root拥有真正的root权限,否则容器出现perission denied
-v:宿主机和容器路径映射
RABBITMQ_DEFAULT_USER=admin:配置用户名
RABBITMQ_DEFAULT_PASS=123456:配置密码
Erlang Cookie值必须相同,相当于不同节点间通信的密钥。

配置集群:

#节点一配置集群
docker exec -it rabbitmq1 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit
#节点二假如集群,--ram是以内存方式加入,忽略该参数默认未磁盘节点
docker exce -it rabbitmq2 bash
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@rabbit_host1
exit
#节点三假如集群,--ram是以内存方式加入,忽略该参数默认未磁盘节点
docker exce -it rabbitmq3 bash
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@rabbit_host1
exit
#查看集群节点状态,配置启动三个节点,1个磁盘节点和2个内存节点
rabbitmqctl cluster_status

访问节点1的web控制台,可以看到多个节点

6a494a72528847ae9458ef66c5bde606.jpeg

测试:node1节点创建队列,发送消息(可以选择消息的持久化,Spring-AMPQ中默认就是持久化),node2和node3都可以看到消息同步,kill掉node1节点,发现node2和node3为NaN模式,如果是非主节点创建队列和发送消息,则其他队列也可以显示。

7.3.RabbitMQ高可用mirror镜像集群配置策略

背景:解决普通集群主节点宕机出现数据丢失的现象

RabbitMQ的策略policy是用来控制和修改集群的bhost队列和Exchange复制行为,就是要设置那些Exchange或者queue的数据需要复制、同步,以及如何复制同步

创建一个策略来匹配队列

路径:rabbitmq管理界面->Admin->Policies->Add/update a policy

参数:策略会同步一个VirtualHost中的交换机和队列数据

name:自定义策略名称
Pattern:^匹配符,代表匹配所有
Definition:ha-mode=all为匹配类型,分为3中模式
ha-model:指明镜像队列的模式,可选下面一种
all:表示在集群中所有的节点上进行镜像同步
exactly:表示在指定个数的节点上进行镜像同步,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像同步,界定啊名称由ha-params指定
ha-sync-mode:镜像消息同步方式automatic(自动),manually(手动)

aa58945d606c4b3da893ab05e502d318.jpeg

配置好后,+2的意思就是由三个节点,一个节点本身和两个镜像节点

7a63044de35844f6a20566811e2ff9cd.jpeg

集群重启顺序

  • 集群重启的顺序是固定的,并且是相反的
  • 启动顺序:磁盘节点=》内存节点
  • 关闭顺序:内存节点=》磁盘节点

7.4.SpringBoot配置RabbitMQ集群

把host和port节点去掉换成addresses :10.211.55.13:5672,10.211.55.13:5673,10.211.55.13:5674


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 Java 测试技术
消息队列 MQ使用问题之数据流出规则是否支持平台的云RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
26天前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
26天前
|
消息中间件 Docker 容器
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
这篇文章提供了RabbitMQ的安装和基本使用教程,包括如何使用Docker拉取RabbitMQ镜像、创建容器、通过浏览器访问管理界面,以及如何创建交换机、队列、绑定和使用direct、fanout和topic三种类型的交换器进行消息发布和接收的测试。
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
|
26天前
|
消息中间件 存储 网络协议
消息中间件RabbitMQ---概述和概念 【一】
该文章提供了对消息中间件RabbitMQ的全面概述,包括其核心概念、工作原理以及与AMQP和JMS的关系。
消息中间件RabbitMQ---概述和概念 【一】
|
2月前
|
消息中间件
高弹性低成本的云消息队列 RabbitMQ 版陪跑班开课啦!
参与 高弹性低成本的云消息队列 RabbitMQ 版陪跑班 赢好礼
110 10
高弹性低成本的云消息队列 RabbitMQ 版陪跑班开课啦!
|
22天前
|
消息中间件 存储 Java
分享一下rocketmq入门小知识
分享一下rocketmq入门小知识
25 0
分享一下rocketmq入门小知识
|
1月前
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
118 2
|
2月前
|
消息中间件
云消息队列RabbitMQ版入门训练营 打卡领好礼
云消息队列RabbitMQ版入门训练营 打卡领好礼
36 3
|
2月前
|
消息中间件 API 开发工具
消息队列 MQ使用问题之如何开启RabbitMQ的MQTT功能
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
30天前
|
消息中间件 网络架构
RabbitMQ消息队列常见面试题
这篇文章总结了RabbitMQ的常见面试题,涵盖了消息模型、使用场景、实现功能、消息幂等性、顺序性、堆积和丢失的避免方法,以及推模式和拉模式的区别。
42 0