MQ的一些常见问题
消息可靠性问题:如何确保发送的消息至少被消费一次
延迟消息问题:如何实现消息的延迟投递
高可用问题:如何避免单点的MQ故障而导致的不可用问题
消息堆积问题:如何解决百万消息堆积,无法及时消费的问题
消息可靠性问题
消息从生产者发送到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性?
发送时丢失:
生产者发送的消息未送达exchage
消息到达exchange后未到达queue
MQ宕机,queue将消息丢失
consumer接收到消息后未消息就宕机
生产者确认机制
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:
publisher-confirm,发送者确认
消息成功投递到交换机,返回ack
消息未投递到交换机,返回nack
publisher-return,发送者回执
消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
publisher发送消息给交换机(exchange),交换机再发送给队列
注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突
配置说明:
publish-confirm-type:开启publisher-confirm,这里支持两种类型:
simple:同步等待confirm结果,直到超时
correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
每个RabbiTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:
SpringAMQP实现生产者确认
1.在publisher这个微服务的application.yml中添加配置
配置说明:
publish-confirm-type:开启publisher-confirm,这里支持两种类型:
simple:同步等待confirmCallback,MQ返回结果时会回调这个ConfirmCallback
publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
template.mandatory:定义消息路由失败时的策略。true,则调用ReturCallback;false:则直接丢弃消息
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置
总结
SpringAMQP中处理消息确认的几种情况:
publisher-comfirm:
消息成功发送到exchange,返回ack
消息发送失败,没有到达交换机,返回nack
消息发送过程中出现异常,没有收到回执
消息成功发送到exchange,但没有路由到queue
调用ReturnCallbback
消费者确认
RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:
manual:手动ack,需要在业务代码结束后,调用api发送ack。
auto:自动ack,由spring检测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
none:关闭cak,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
消费者失败重试
当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力;
我们可以利用spring的retru机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
消费者失败消息处理策略
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
republishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
测试下RepublishMessageRecoverer处理模式:
首先,定义接收失败消息的交换机、队列及其绑定关系:
然后,定义RepublishMessageRecoverer
如何确保RabbitMQ消息的可靠性?
开启生产者确认机制,确保生产者的消息能到达队列
开启持久化功能,确保消息未消费前在队列不会丢失
开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理。
死信交换机&消息堆积
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false
消息是一个过期消息,超时无人消费
要投递的队列消息堆积满了,最早的消息可能成为死信
如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机。
什么样的消息会成为死信?
消息被消费者reject或者返回nack
消息超时未消费
队列满了
如何给队列绑定死信交换机?
给队列设置dead-letter-exchange属性,指定一个交换机
给队列设置dead-letter-routing-key属性,设置死信交换
机与死信队列的Routingkey
TTL
TTL,也就是Time-To—Live。如果一个队列中的消息TTL结束仍未消费,则会变为死信,ttl超时分为两种情况:
消息所在的队列设置了存活时间
消息本身设置了存活时间
消息超时的两种方式是?
给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
给消息设置ttl属性,队列接收到的消息超过ttl时间变为死信
两者共存时,以时间短的ttl为准
如何实现发送一个消息20秒后消费者才收到消息?
给消息的目标队列指定死信交换机
消费者监听与死信交换机绑定的队列
发送消息时给消息设置ttl为20秒
延迟队列
利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。
延迟队列的使用场景包括:
延迟发送短信
用户下单,如果用户在15分钟内未支付,则自动取消
预约工作会议,20分钟后自动通知所有参会人员
延迟队列插件
因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。
详细安装过程参考课前资料文档
SpringAMQP使用延迟队列插件
DelayExchange的本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delayed属性为true即可。
基于注解方式:
总结:延迟队列插件的使用步骤包括哪些?
声明一个交换机,添加delayed属性为true
发送消息时,添加x-delay头,值为超时时间
消息堆积问题
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。
解决消息堆积有三种思路:
增加更多消费者,提高消费速度
在消费者内开启线程池加快消息处理速度
扩大队列容积,提高堆积上限
惰性队列
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。
惰性队列的特征如下:
接收到消息后直接存入磁盘而非内存
消费者要消费消息时才会从磁盘中读取并加载到内存
支持数百万条的消息存储
而要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:
惰性队列
用SpringAMQP声明惰性队列分两种方式:
消息堆积问题的解决方案?
队列上绑定多个消费者,提高消费速度
给消费者开启线程池,提高消费速度
使用惰性队列,可以在mq中保存更多消息
惰性队列的优点有哪些?
基于磁盘存储,消息上限高
没有间歇性的page-out,性能比较稳定
惰性队列的缺点有哪些?
基于磁盘存储,消息时效性会降低
性能受限于磁盘的IO
当一个队列中的消息满足下列情况之一时,可以成为死信:
消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false
消息是一个过期消息,超时无人消费
要投递的队列消息积满了,最早的消息可能成为死信