RabbitMQ是一种基于erlang语言开发的流行的开源消息中间件,或者说是一个消息队列系统。它是对AMQP协议的实现,支持多种客户端,可以对来自客户端的异步消息进行存储转发,在易用性、扩展性、高可用性等方面表现不俗。适用于解耦、削峰、消息通讯等消息队列场景。
RabbitMQ中消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列。实际上,生产者通常甚至不知道消息是否会被传递到任何队列.而且生产者只能向交换机发送消息。
基本概念
Server:接收客户端的连接,实现AMQP实体服务。
Connection:连接,应用程序与Server的网络连接,TCP连接。
Channel:信道,消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。
Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。由Properties和Body组成。Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。
Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种,后面详细介绍。
Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。
RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”。
Queue:消息队列,用来保存消息,供消费者消费。
保证消息不丢失
(1)生产者发送消息到RabbitMQ服务器的过程中出现消息丢失。可能是网络波动未收到消息,又或者是服务器宕机。
(2)RabbitMQ服务器消息持久化出现消息丢失。消息发送到 RabbitMQ之后,未能及时存储完成持久化,RabbitMQ服务器出现宕机重启,消息出现丢失。
(3)消费者拉取消息过程以及拿到消息后出现消息丢失。消费者从RabbitMQ服务器获取到消息过程出现网络波动等问题可能出现消息丢失;消费者拿到消息后但是消费者未能正常消费,导致丢失,可能是消费者出现处理异常又或者是消费者宕机。
针对上述三种消息丢失场景,RabbitMQ提供了相应的解决方案,confirm 消息确认机制(生产者),消息持久化机制(RabbitMQ 服务),ACK 事务机制 (消费者)
消息不被重复消费
生产者:生产者可能会重复推送一条数据到 MQ中,比如Controller接口被重复调用了2次,没有做接口幂等性导致的;
MQ:在消费者消费完准备响应ack 消息消费成功时,MC突然挂了,导致MQ以为消费者还未消费该条数据,MQ恢复后再次推送了该条消息,导致了重复消费
消费者:消费者已经消费完消息,正准备但是还未响应给ack消息到时,此时消费者挂了,服务重启后MQ以为消费者还没有消费该消息,再次推送了该条消息。
方案:
(1)使用数据库唯一键约束(局限性,仅能用在数据新增场景)
(2)使用乐观锁,发送消息时带上修改字段的版本号
(3)插入消费记录,增加数据库判断
消息堆积
(1)消费者处理消息的速度太慢-如果消费者处理消息的速度太慢,那么消息就会在队列中堆积。-》水平扩展增加消费者数量/优化消费者性能/消息预取限制
(2)队列的容量太小-如果队列的容量太小,那么消息也会在队列中堆积。-》增加队列容量
(3)网络故障-如果网络发生故障,那么消息可能会丢失,导致消息在队列中堆积。-》监控和告警/持久化和高可用性
(4)消费者故障-如果消费者发生故障,那么消息也会在队列中堆积。-》使用死信队列(将无法处理的消息转到死信队列)/实现容错机制
(5)队列配置不当-错误配置的队列(如错误的消息确认模式或队列长度限制)可能导致处理速度不佳。-》优化队列配置(检查并优化消息确认模式、队列长度限制等)
(6)消息大小-大型消息处理时间较长,可能导致处理效率降低。-》消息分片
(7)业务逻辑复杂或耗时-如果消费者在处理消息时需要执行复杂的业务逻辑或耗时的操作,处理速度会受到影响。-》优化业务逻辑
(8)消息产生速度快于消费速度-未处理消息会在队列中累计。-》使用消息限流/负载均衡
交换机类型:
(1)直连交换机
(2)Fanout扇出交换机
(3)Topic主题交换机
(4)Header交换机