6.RabbitMQ死信队列
6.1.什么是TTL
- time to live消息存活时间
- 如果消息在存活时间内未被消费,就会清除
- RabbitMQ支持两种TTL
- 单独消息进行配置ttl
- 个队列进行配置ttl
6.2.什么是RabbitMQ的死信队列
- 没有被及时消费的消息存放的队列
6.3.什么是RabbitMQ的死信交换机
- Dead Letter Exchange(死信交换机,缩写:DLX)当消息过期未被消费后,会通过死信交换机,转发到死信队列,这个交换机就是DLX死信交换机
v
6.4.消息有哪几种情况成为死信
- 消费者拒收消息(basicNack或者basicReject),并且没有重新入队requeue=false
- 消息在队列中未被消费,且到了过期时间的消息(TTL)
- 队列长度达到极限后,如果绑定死信交换机和死信队列就会被投放到死信队列
6.5.什么是延迟队列
- 一种带有延迟功能的消息队列,Producer将消息发送到消息队列的服务端,但并不希望该条消息立马投递,而是推迟到当前时间的之后的一个时间Consumer进行消费,也可以叫定时消息
使用场景
- 通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息
- 用户登录之后5分钟给用户做分类推送、用户多少天未登录给用户做召回推送
- 订单超时未支付关闭订单场景
业界一些实现方式
- 定时任务高精度轮回
- 采用RocketMQ自带延迟消息功能
- RabbitMQ结合死信交换机死信队列做到延迟消息
6.6.死信队列+死信交换机实现延迟消息
/** * mq配置类 * 新商家上架->new_merchant_queue->死信交换机->死信队列 */ @Configuration public class RabbitMQConfig { /** * 死信队列 */ public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue"; /** * 死信交换机 */ public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange"; /** * 进入死信队列的路由key */ public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key"; /** * 创建死信交换机 * @return */ @Bean public Exchange lockMerchantDeadExchange(){ return new TopicExchange(LOCK_MERCHANT_DEAD_EXCHANGE,true,false); } /** * 创建死信队列 * @return */ @Bean public Queue lockMerchantDeadQueue(){ return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build(); } /** * 绑定死信交换机和死信队列 * @return */ @Bean public Binding lockMerchantBinding(){ return new Binding(LOCK_MERCHANT_DEAD_QUEUE,Binding.DestinationType.QUEUE, LOCK_MERCHANT_DEAD_EXCHANGE,LOCK_MERCHANT_ROUTING_KEY,null); } /** * 普通队列,绑定的个死信交换机 */ public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue"; /** * 普通的topic交换机 */ public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange"; /** * 路由key */ public static final String NEW_MERCHANT_ROUTIING_KEY = "new_merchant_routing_key"; /** * 创建普通交换机 * @return */ @Bean public Exchange newMerchantExchange(){ return new TopicExchange(NEW_MERCHANT_EXCHANGE,true,false); } /** * 创建普通队列 * @return */ @Bean public Queue newMerchantQueue(){ Map<String,Object> args = new HashMap<>(3); //消息过期后,进入到死信交换机 args.put("x-dead-letter-exchange",LOCK_MERCHANT_DEAD_EXCHANGE); //消息过期后,进入到死信交换机的路由key args.put("x-dead-letter-routing-key",LOCK_MERCHANT_ROUTING_KEY); //过期时间,单位毫秒 args.put("x-message-ttl",20000); return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build(); } /** * 绑定交换机和队列 * @return */ @Bean public Binding newMerchantBinding(){ return new Binding(NEW_MERCHANT_QUEUE,Binding.DestinationType.QUEUE, NEW_MERCHANT_EXCHANGE,NEW_MERCHANT_ROUTIING_KEY,null); } }
@Component @RabbitListener(queues = RabbitMQConfig.LOCK_MERCHANT_DEAD_QUEUE) public class OrderMQListener { /** * 处理器,适配器,加上@RabbitHandler注解 */ @RabbitHandler public void messageHandler(String body, Message message, Channel channel) throws IOException { long tag = message.getMessageProperties().getDeliveryTag(); System.out.println("message:"+message.toString()); System.out.println("=============="); System.out.println("消息标识tag:"+tag); System.out.println("消息体body:"+body); //如果发生异常就重新入队 try{ channel.basicAck(tag,false); }catch (Exception e){ //拒收消息,重新入队 channel.basicNack(tag,false,true); } } }
7.RabbitMQ高可用集群模式
7.1.RabbitMQ集群模式介绍
普通集群
- 默认的集群模式,普通集群模式下同步交换机、队列、虚拟主机元数据,不同步消息信息,当消费者去B节点访问数据,数据在A节点,这时A节点先把数据转发给B节点,消费者在从B节点中pull消息。
- 存在问题:假如存在消息的节点宕机了,那么消费者想要消费这个消息,就要等当前节点恢复后才可以恢复正常,如果没有做消息持久化,则消息会丢失
镜像集群
- 队列做成镜像队列,让队列存在于各个节点中,和普通集群比较大的区别就是queue的message在各个节点之间同步,并不是在consumer获取时拉去,转发。
- 存在问题:由于镜像队列模式下,消息数量过去,大量的消息同步也会加大网络带宽的开销,适合高可用的项目,多节点性能会收到影响。
注意:集群需要保证各个节点有相同的token令牌,集群内各个节点的erlang.cookie需要相同,才可以相互通信
7.2.RabbitMQ搭建普通集群
准备三个mq节点
#节点一,主节点,创建-v映射目录 docker run -d --hostname rabbit_host1 --name rabbitmq1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/1/log:/var/log/rabbitmq rabbitmq:management
#节点二,创建-v映射目录 docker run -d --hostname rabbit_host2 --name rabbitmq2 -p 15673:15672 -p 5673:5672 --link rabbitmq1:rabbit_host1 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/2/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/2/log:/var/log/rabbitmq rabbitmq:management
#节点三,创建-v映射目录 docker run -d --hostname rabbit_host3 --name rabbitmq3 -p 15674:15672 -p 5674:5672 --link rabbitmq1:rabbit_host1 --link rabbitmq2:rabbit_host2 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/3/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/3/log:/var/log/rabbitmq rabbitmq:management
参数说明:
--hostname:自定义Docker容器的hostname --link:容器之间连接,link不可缺,使得三个容器相互通信 --privileged=true:使用该参数,container内的root拥有真正的root权限,否则容器出现perission denied -v:宿主机和容器路径映射 RABBITMQ_DEFAULT_USER=admin:配置用户名 RABBITMQ_DEFAULT_PASS=123456:配置密码 Erlang Cookie值必须相同,相当于不同节点间通信的密钥。
配置集群:
#节点一配置集群 docker exec -it rabbitmq1 bash rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl start_app exit #节点二假如集群,--ram是以内存方式加入,忽略该参数默认未磁盘节点 docker exce -it rabbitmq2 bash rabbitmqctl stop_app rabbitmqctl join_cluster --ram rabbit@rabbit_host1 exit #节点三假如集群,--ram是以内存方式加入,忽略该参数默认未磁盘节点 docker exce -it rabbitmq3 bash rabbitmqctl stop_app rabbitmqctl join_cluster --ram rabbit@rabbit_host1 exit #查看集群节点状态,配置启动三个节点,1个磁盘节点和2个内存节点 rabbitmqctl cluster_status
访问节点1的web控制台,可以看到多个节点
测试:node1节点创建队列,发送消息(可以选择消息的持久化,Spring-AMPQ中默认就是持久化),node2和node3都可以看到消息同步,kill掉node1节点,发现node2和node3为NaN模式,如果是非主节点创建队列和发送消息,则其他队列也可以显示。
7.3.RabbitMQ高可用mirror镜像集群配置策略
背景:解决普通集群主节点宕机出现数据丢失的现象
RabbitMQ的策略policy是用来控制和修改集群的bhost队列和Exchange复制行为,就是要设置那些Exchange或者queue的数据需要复制、同步,以及如何复制同步
创建一个策略来匹配队列
路径:rabbitmq管理界面->Admin->Policies->Add/update a policy
参数:策略会同步一个VirtualHost中的交换机和队列数据
name:自定义策略名称 Pattern:^匹配符,代表匹配所有 Definition:ha-mode=all为匹配类型,分为3中模式
ha-model:指明镜像队列的模式,可选下面一种 all:表示在集群中所有的节点上进行镜像同步 exactly:表示在指定个数的节点上进行镜像同步,节点的个数由ha-params指定 nodes:表示在指定的节点上进行镜像同步,界定啊名称由ha-params指定 ha-sync-mode:镜像消息同步方式automatic(自动),manually(手动)
配置好后,+2的意思就是由三个节点,一个节点本身和两个镜像节点
集群重启顺序
- 集群重启的顺序是固定的,并且是相反的
- 启动顺序:磁盘节点=》内存节点
- 关闭顺序:内存节点=》磁盘节点
7.4.SpringBoot配置RabbitMQ集群
把host和port节点去掉换成addresses :10.211.55.13:5672,10.211.55.13:5673,10.211.55.13:5674