消息堆积如何处理
本质上就是生产者速度大于消费者速度
生产者:限制生产者发送速率
broker:优先核心业务处理,降级处理 惰性队列(针对更多是消息持久化(记录在磁盘)的时候 ,按需从内存中加载,不是直接都加载到内存中)
消费者:消费者数量,提高消费者处理能力、多线程
异步
RabbitMQ消息的可靠性与MQTT消息可靠性
RabbitMQ
生产者确认机制:publish-confirm、publish-return确保消息一定到MQ
消息持久化:借助于springAMQP声明的交换机、队列、消息默认都是持久化,或者在浏览器界面化创建的时候,勾选Durable参数,也可以默认持久化
消费者确认+重试机制:当指定队列失败次数达到上限之后,可以将消息投递给死信队列,然后人工处理
死信队列绑定交换机的key不一定要与工作队列绑定交换机的key一样(工作队列->交换机->死信队列)
生产者:重试,确认机制
broker(交换机、队列):持久化
消费者:ack确认机制
生产者确认机制:
找不到交换机:publish-confirm 返回 nack
找不到队列 : publish -confirm 返回 ack publish-return 返回异常信息
没有问题:publish-confirm 返回 ack
消费者重试:默认情况下,如果一直消费不成功,消息会不断requeue(重入队)到队列,然后再重新获取 ,因此加了一个重试机制
重试机制:实现RepublishMessageRecoverer
,将失败消息投递到固定的交换机,通过交换机将消息转发到失败消息队列,程序监听失败消息队列,接收到失败消息,将失败消息存入失败消息表,通过定时任务进行处理。
持久化三要素:协同工作保障消息可靠性
组件 |
作用 |
持久化设置 |
注意事项 |
交换机持久化 |
确保交换机在 RabbitMQ 重启后仍然存在 |
(声明交换机时) |
非持久化交换机会在重启后丢失 |
队列持久化 |
确保队列在 RabbitMQ 重启后仍然存在 |
(声明队列时) |
非持久化队列会在重启后丢失 |
消息持久化 |
确保消息内容在 RabbitMQ 重启后仍然可用 |
(发送消息时) |
即使队列持久化,消息本身也必须标记为持久化 |
MQTT
- QoS 0(最多一次):
- 用于无关紧要的数据(如周期性温度上报,丢包不影响业务)。
- QoS 1(至少一次):
- 90% 的推荐场景(如设备状态更新),配合业务幂等性成本最低。
- QoS 2(恰好一次):
- 仅用于强一致性需求(如订单状态变更),注意性能损耗。
延迟消息实现(针对的是订单超时自动取消)
方案以:死信队列+ttl(消息设置存活时间,时间一到就可以进行投递到死信队列,进行专门消费者执行)
方案二:延迟队列
对于时间的设置要选择,比如30分钟取消,可以设置延迟时间35min(避免卡点行为)
如何保证MQ幂等性?或 如何防止消息重复消费?
设置消息唯一ID、或者根据消息状态判断
MQ有什么应用场景?
异步处理、系统解耦、流量削峰、日志收集与分析(ELK架构)、分布式事务最终一致性、延迟消息与定时任务
RabbitMQ 的工作模型有哪些?
- 工作队列模型
消费者直接绑定到队列上。
一个队列可以绑定一个或多个消费者,多个消费者绑定到一个队列会共同消费队列中的消息,提高消费能力避免消息堆积。
2.发布订阅模型
发布订阅模型可以实现一条消息发给多个队列,每个队列绑定到同一个交换机,最终实现了向多个消费者发送一条消息,这种模式称为“发布/订阅”模型。
发布订阅模型中通过交换机有不同的类型,完成将消息推送到队列:
Fanout:广播类型,将消息交给所有绑定到交换机的队列。采用轮询的方式一次只能由一个消费者消费消息(默认状态)
Direct:直接类型,基于RoutingKey(路由key)发送给订阅了消息的队列
Topic:通配符类型(主题类型),与Direct类似,只不过RoutingKey可以使用通配符 对应的# (通配hash->匹配零个或多个单词)和*(通配符->仅匹配一个单词)