什么是RabbitMQ
使用AMQP高级队列协议的一种消息队列技术,最大的特点就是消费并不需要确保提供方存在,实现了服务之间的高度解耦
什么是AMQP
AMQP(Advanced Message Queuing Protocol)高级消息队列协议,一个提供统一消息服务的应用层标准协议,是应用层协议的一个开放标准,为面向消息的中间件设计。AMQP是一个进程间传递异步消息的网络协议。
基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP在消息提供者和客户端的行为进行了强制规定,使得不同卖商之间真正实现了互操作能力。
为什么要使用RabbitMQ(优点)
异步:主流程只需要完成业务的核心功能;对于业务非核心功能将消息放入消息队列之中进行异步处理,减少请求的等待,提高系统的总体性能
解耦:将系统按照不同的业务功能来拆分出来,消费生产者只管把消息发布到MQ中而不用管谁来取,消息消费者只管从MQ中取消息而不管是谁发布的,消息生产者和消费者都不知道对方的存在
削峰/限流:将所有的请求都写到消息队列中,消费服务器按照自身能够处理的请求数从队列中拿到请求,防止请求并发过高将系统搞崩溃
RabbitMQ的缺点
系统的可用性降低: 系统引用的外部依赖越多,越容易挂掉,如果MQ服务器挂掉,那么可能导致整套系统崩溃
系统的复杂度提高: 加入消息队列之后,需要保证消息没有重复消费、如果处理消息丢失、有序性等问题
数据一致性问题: A系统处理完直接返回成功,使用者都以为你这个请求就成功了但是问题是,要是BCD系统哪里,BD系统写库成功了,结果C系统写库失败了,就会导致数据不一致
Publisher:生产者
Consumer:消费者
Broker:表示消息队列的服务器实体
Queue:消息队列,用于存放消息
Exchange:交换器,接受生产者发送的消息,根据路由键路由到指定的队列
Routing Key:路由关键字,用于指定这个消息的路由规则,需要与交换机类型和绑定键(binding-key)联合使用
Binding:绑定,通过绑定将交换机和队列关联起来,一般会指定一个bind-key,通过bind-key交换机就知道将消息路由给那个队列了
Connection:网络连接,用来连接到broker
Channel:信道,AMQP命令都是在信道中进行的,不管是发布消息、订阅队列还是接受消息。因为建立和消耗TCP都是非常昂贵的开销,所有引入信道的概念复用一条TCP连接
Message:消息,由消息头和消息体组成,消息头属性包括路由键,优先级等
Virtual host:虚拟主机,用于逻辑隔离,表示一批独立的交换器、消息队列和相关对象
生产者生产消息的过程
1、producer先连接到Broker,建立Connection,开启一个信道
2、Producer 声明一个交换机设置好属性
3、Producer 声明一个队列设置好属性
4、Producer 通过绑定建将交换器和队列绑定起来
5、Producer 发送消息到Broker,其中包含路由建、交换器等信息
6、交换机通过路由键查找匹配的队列
7、如果找到,将消息存入对应的队列,如果没有找到,会根据生产者的配置丢弃或者退回给生产者
8、关闭信道
消费者接受消息过程
1、推模式
推模式接收消息是最有效的一种消息处理方式,当消息到达RabbitMQ时候,RabbitMQ会自动地、不断地投递消息给匹配的i消费者,不需要消费端手动来拉取,推模式将消息提前推送给消费者,消费者必须设置一个缓冲区缓存这些消息
优点
消息者总是有一堆在内存中待处理的消息,所以效率比较高
由于推模式的信息到达就给匹配的消费者所以实时性比较好,消费者能及时得到最新的消息
缺点
缓存区可能会溢出
2、拉模式
拉模式在消费者需要时才去消息中间件拉取消息
缺点
会增加消息延迟,降低系统吞吐量,由于拉模式需要消费者手动去RabbitMQ中拉取消息,所以实时性不高
如何保证消息不丢失,进行可靠传输
RabbitMQ提供事务机制和确认机制两种模式来确保生产者不丢失消息
事务机制
发送消息前,开启事务,然后发送消息,如果发送过程中出现异常,事务就会回滚,如果成功就提交事务
缺点
生产者发送消息会同步阻塞等待发送结果是成功还是失败,导致生产者发送消息吞吐量降低
确认机制
生产者将信道设置为confirm模式,一旦channel进入confirm模式,所以在该信道上发布的消息都会被指派一个ID,消息被broker接收到就会执行confirmCallback,如果是集群,需要所有的都收到才会调用confirmCallback,被broker接收到只能表示meaasge到达服务器,不能保证到达了queue。
如果没有未能投递到目标queue里将调用returnCall,记录下数据,定期巡查和纠错
消费者获取到数据,成功处理返回ack。
但是默认是自动确认消息模式,当消费者还在处理中,消费者就会返回ack,通知RabbitMQ已经收到了消息,然后RabbitMQ就会立即删除,但是如果消息者出现了异常没有处理掉消息就会丢失
所有采用手动确认模式,等到消息被真正消费之后,再发送一个确认信号,即使中途消息没有处理完,但是服务器宕机了,那么RabbitMQ就收不到ack就会继续发送这条消息
如何保证消息不被重复消费
1、改变业务逻辑,使得在重复消费时也不影响结果,例如使用乐观锁加个version字段
2、基于数据库的唯一主键约束。消费完消息后,在数据库中做一个insert操作,如果出现重复消费就会主键冲突
3、记录关键key,当消息过来时候,判断这个key是不是已经被处理过了,如果没处理就再进行下一步
如何保证消息的有序性
1、拆分queue,使得一个queue只对应一个消费者,由于MQ内部一般都能保证内部队列是先进先出的,所有把需要保持先后顺序的一组消息使用某种算法分配到同一个消息队列,然后使用一个消费者去消费该队列,这样就会保证消费者是按照顺序进行消费的,但是吞吐量会出现瓶颈
2、对于多线程消费同一个队列的情况,可以使用重试机制,比如一个操作发帖子,写评论,删除微博,这三个异步操作,如果一个消费者先执行删除微博,但是还没有发就一定会失败,等一段时间后再执行这个操作就行
如何处理消息堆积情况
1、找出该问题的原因
(消费者宕机、消费者能力不足、发送者流量过大、业务逻辑原因)
2、临时扩容
1、先修复消费者问题,确保其恢复消费速度,然后停掉消费者
2、创建原先N倍的queue,然后写一个临时分发数据的消费者程序,将该程序部署上去消费队列中的积压的数据
3、然后使用N倍的机器来部署消费者,每个消费者来消费一个临时queue中的数据
4、等消费完之后,恢复最开始的部署,使用原先的消费者机器