【消息队列】消息中间件RabbitMQ急速入门3

简介: 【消息队列】消息中间件RabbitMQ急速入门

6.RabbitMQ死信队列

6.1.什么是TTL

  • time to live消息存活时间
  • 如果消息在存活时间内未被消费,就会清除
  • RabbitMQ支持两种TTL
  • 单独消息进行配置ttl
  • 个队列进行配置ttl

6.2.什么是RabbitMQ的死信队列

  • 没有被及时消费的消息存放的队列

6.3.什么是RabbitMQ的死信交换机

  • Dead Letter Exchange(死信交换机,缩写:DLX)当消息过期未被消费后,会通过死信交换机,转发到死信队列,这个交换机就是DLX死信交换机

5efeea966a0a421faf2ea2ef97fcd815.jpegv

6.4.消息有哪几种情况成为死信

  • 消费者拒收消息(basicNack或者basicReject),并且没有重新入队requeue=false
  • 消息在队列中未被消费,且到了过期时间的消息(TTL)
  • 队列长度达到极限后,如果绑定死信交换机和死信队列就会被投放到死信队列

6.5.什么是延迟队列

  • 一种带有延迟功能的消息队列,Producer将消息发送到消息队列的服务端,但并不希望该条消息立马投递,而是推迟到当前时间的之后的一个时间Consumer进行消费,也可以叫定时消息

使用场景

  • 通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息
  • 用户登录之后5分钟给用户做分类推送、用户多少天未登录给用户做召回推送
  • 订单超时未支付关闭订单场景

业界一些实现方式

  • 定时任务高精度轮回
  • 采用RocketMQ自带延迟消息功能
  • RabbitMQ结合死信交换机死信队列做到延迟消息

6.6.死信队列+死信交换机实现延迟消息

e31b257727c040e5867d875e248764e2.jpeg

/**
 * mq配置类
 * 新商家上架->new_merchant_queue->死信交换机->死信队列
 */
@Configuration
public class RabbitMQConfig {
    /**
     * 死信队列
     */
    public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";
    /**
     * 死信交换机
     */
    public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";
    /**
     * 进入死信队列的路由key
     */
    public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key";
    /**
     * 创建死信交换机
     * @return
     */
    @Bean
    public Exchange lockMerchantDeadExchange(){
        return new TopicExchange(LOCK_MERCHANT_DEAD_EXCHANGE,true,false);
    }
    /**
     * 创建死信队列
     * @return
     */
    @Bean
    public Queue lockMerchantDeadQueue(){
        return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();
    }
    /**
     * 绑定死信交换机和死信队列
     * @return
     */
    @Bean
    public Binding lockMerchantBinding(){
        return new Binding(LOCK_MERCHANT_DEAD_QUEUE,Binding.DestinationType.QUEUE,
                LOCK_MERCHANT_DEAD_EXCHANGE,LOCK_MERCHANT_ROUTING_KEY,null);
    }
    /**
     * 普通队列,绑定的个死信交换机
     */
    public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue";
    /**
     * 普通的topic交换机
     */
    public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";
    /**
     * 路由key
     */
    public static final String NEW_MERCHANT_ROUTIING_KEY = "new_merchant_routing_key";
    /**
     * 创建普通交换机
     * @return
     */
    @Bean
    public Exchange newMerchantExchange(){
        return new TopicExchange(NEW_MERCHANT_EXCHANGE,true,false);
    }
    /**
     * 创建普通队列
     * @return
     */
    @Bean
    public Queue newMerchantQueue(){
        Map<String,Object> args = new HashMap<>(3);
        //消息过期后,进入到死信交换机
        args.put("x-dead-letter-exchange",LOCK_MERCHANT_DEAD_EXCHANGE);
        //消息过期后,进入到死信交换机的路由key
        args.put("x-dead-letter-routing-key",LOCK_MERCHANT_ROUTING_KEY);
        //过期时间,单位毫秒
        args.put("x-message-ttl",20000);
        return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();
    }
    /**
     * 绑定交换机和队列
     * @return
     */
    @Bean
    public Binding newMerchantBinding(){
        return new Binding(NEW_MERCHANT_QUEUE,Binding.DestinationType.QUEUE,
                NEW_MERCHANT_EXCHANGE,NEW_MERCHANT_ROUTIING_KEY,null);
    }
}
@Component
@RabbitListener(queues = RabbitMQConfig.LOCK_MERCHANT_DEAD_QUEUE)
public class OrderMQListener {
    /**
     * 处理器,适配器,加上@RabbitHandler注解
     */
    @RabbitHandler
    public void messageHandler(String body, Message message, Channel channel) throws IOException {
        long tag = message.getMessageProperties().getDeliveryTag();
        System.out.println("message:"+message.toString());
        System.out.println("==============");
        System.out.println("消息标识tag:"+tag);
        System.out.println("消息体body:"+body);
        //如果发生异常就重新入队
        try{
            channel.basicAck(tag,false);
        }catch (Exception e){
            //拒收消息,重新入队
            channel.basicNack(tag,false,true);
        }
    }
}

7.RabbitMQ高可用集群模式

7.1.RabbitMQ集群模式介绍

普通集群

  • 默认的集群模式,普通集群模式下同步交换机、队列、虚拟主机元数据,不同步消息信息,当消费者去B节点访问数据,数据在A节点,这时A节点先把数据转发给B节点,消费者在从B节点中pull消息。
  • 存在问题:假如存在消息的节点宕机了,那么消费者想要消费这个消息,就要等当前节点恢复后才可以恢复正常,如果没有做消息持久化,则消息会丢失

镜像集群

  • 队列做成镜像队列,让队列存在于各个节点中,和普通集群比较大的区别就是queue的message在各个节点之间同步,并不是在consumer获取时拉去,转发。
  • 存在问题:由于镜像队列模式下,消息数量过去,大量的消息同步也会加大网络带宽的开销,适合高可用的项目,多节点性能会收到影响。

注意:集群需要保证各个节点有相同的token令牌,集群内各个节点的erlang.cookie需要相同,才可以相互通信

a8570ae761bb446390715cada140207d.jpeg

7.2.RabbitMQ搭建普通集群

准备三个mq节点

#节点一,主节点,创建-v映射目录
docker run -d --hostname rabbit_host1 --name rabbitmq1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168  -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/1/log:/var/log/rabbitmq rabbitmq:management
#节点二,创建-v映射目录
docker run -d --hostname rabbit_host2 --name rabbitmq2  -p 15673:15672 -p 5673:5672 --link rabbitmq1:rabbit_host1 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/2/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/2/log:/var/log/rabbitmq rabbitmq:management
#节点三,创建-v映射目录
docker run -d --hostname rabbit_host3 --name rabbitmq3 -p 15674:15672 -p 5674:5672 --link rabbitmq1:rabbit_host1 --link rabbitmq2:rabbit_host2 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/3/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/3/log:/var/log/rabbitmq rabbitmq:management

参数说明:

--hostname:自定义Docker容器的hostname
--link:容器之间连接,link不可缺,使得三个容器相互通信
--privileged=true:使用该参数,container内的root拥有真正的root权限,否则容器出现perission denied
-v:宿主机和容器路径映射
RABBITMQ_DEFAULT_USER=admin:配置用户名
RABBITMQ_DEFAULT_PASS=123456:配置密码
Erlang Cookie值必须相同,相当于不同节点间通信的密钥。

配置集群:

#节点一配置集群
docker exec -it rabbitmq1 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit
#节点二假如集群,--ram是以内存方式加入,忽略该参数默认未磁盘节点
docker exce -it rabbitmq2 bash
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@rabbit_host1
exit
#节点三假如集群,--ram是以内存方式加入,忽略该参数默认未磁盘节点
docker exce -it rabbitmq3 bash
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@rabbit_host1
exit
#查看集群节点状态,配置启动三个节点,1个磁盘节点和2个内存节点
rabbitmqctl cluster_status

访问节点1的web控制台,可以看到多个节点

6a494a72528847ae9458ef66c5bde606.jpeg

测试:node1节点创建队列,发送消息(可以选择消息的持久化,Spring-AMPQ中默认就是持久化),node2和node3都可以看到消息同步,kill掉node1节点,发现node2和node3为NaN模式,如果是非主节点创建队列和发送消息,则其他队列也可以显示。

7.3.RabbitMQ高可用mirror镜像集群配置策略

背景:解决普通集群主节点宕机出现数据丢失的现象

RabbitMQ的策略policy是用来控制和修改集群的bhost队列和Exchange复制行为,就是要设置那些Exchange或者queue的数据需要复制、同步,以及如何复制同步

创建一个策略来匹配队列

路径:rabbitmq管理界面->Admin->Policies->Add/update a policy

参数:策略会同步一个VirtualHost中的交换机和队列数据

name:自定义策略名称
Pattern:^匹配符,代表匹配所有
Definition:ha-mode=all为匹配类型,分为3中模式
ha-model:指明镜像队列的模式,可选下面一种
all:表示在集群中所有的节点上进行镜像同步
exactly:表示在指定个数的节点上进行镜像同步,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像同步,界定啊名称由ha-params指定
ha-sync-mode:镜像消息同步方式automatic(自动),manually(手动)

aa58945d606c4b3da893ab05e502d318.jpeg

配置好后,+2的意思就是由三个节点,一个节点本身和两个镜像节点

7a63044de35844f6a20566811e2ff9cd.jpeg

集群重启顺序

  • 集群重启的顺序是固定的,并且是相反的
  • 启动顺序:磁盘节点=》内存节点
  • 关闭顺序:内存节点=》磁盘节点

7.4.SpringBoot配置RabbitMQ集群

把host和port节点去掉换成addresses :10.211.55.13:5672,10.211.55.13:5673,10.211.55.13:5674


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
27天前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
1月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
85 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
26天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
66 5
|
20天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
20天前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
24天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
1月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
67 6
|
1月前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
4月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。

热门文章

最新文章