简介
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、 安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
产生背景
1、消息队列系统最在可以追溯到上个世纪(是不是感觉很久远,其实是1983年,那时候我还没用出生)。1983年最早的消息队列软件Teknekron诞生,当时紧用于一些金融交易等系统。
2、上世纪九十年代,诞生了多家消息队列系统,例如IBM MQ、微软的MSMQ、TIBCO MQ等消息队列在企业中的应用也愈加广泛。显然这些商用的消息队列系统如果企业要使用需要付出高昂的成本,并且各个消息队列之间使用不同的API不同的协议。
3、2004年,AMQP(Advanced Message Queuing Protocol,高级消息队列协议)开始开发。通过这一标准可以和任意AMQP供应商提供的MQ服务进行交互。
4、2006年,光阴荏苒时光如梭,一转眼就说到了重点。我们的主角使用Erlang语言实现的AMQP开源版本,RabbitMQ诞生了,同年AMQP协议首次发布。
优缺点
优点
- 应用异步
考虑定外卖支付成功的情况
支付后要发送支付成功的通知,再寻找外卖小哥来进行配送,而寻找外卖小哥的过程非常耗时,尤其是高峰期,可能要等待几十秒甚至更长
这样就造成整条调用链路响应非常缓慢
而如果我们引入RabbitMQ消息队列,订单数据可以发送到消息队列服务器,那么调用链路也就可以到此结束,订单系统则可以立即得到响应,整条链路的响应时间只有200毫秒左右
寻找外卖小哥的应用可以以异步的方式从消息队列接收订单消息,再执行耗时的寻找操作
- 应用解耦
假设有这样一个场景, 服务A产生数据, 而服务B,C,D需要这些数据, 那么我们可以在A服务中直接调用B,C,D服务,把数据传递到下游服务即可
但是,随着我们的应用规模不断扩大,会有更多的服务需要A的数据,如果有几十甚至几百个下游服务,而且会不断变更,再加上还要考虑下游服务出错的情况,那么A服务中调用代码的维护会极为困难
这是由于服务之间耦合度过于紧密
再来考虑用RabbitMQ解耦的情况
A服务只需要向消息服务器发送消息,而不用考虑谁需要这些数据;下游服务如果需要数据,自行从消息服务器订阅消息,不再需要数据时则取消订阅即可
- 流量削峰
假设我们有一个应用,平时访问量是每秒300请求,我们用一台服务器即可轻松应对
而在高峰期,访问量瞬间翻了十倍,达到每秒3000次请求,那么单台服务器肯定无法应对,这时我们可以考虑增加到10台服务器,来分散访问压力
但如果这种瞬时高峰的情况每天只出现一次,每次只有半小时,那么我们10台服务器在多数时间都只分担每秒几十次请求,这样就有点浪费资源了
这种情况,我们就可以使用RabbitMQ来进行流量削峰,高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力
这是消息队列服务器非常典型的应用场景
缺点
- 系统可用性降低:系统引入外部依赖越多,越容易挂掉。本来 A 系统调用 BCD 系统好好的,加一个 MQ 统一连接 BCD 系统,万一 MQ 挂掉,整套系统就崩溃了。见下图所示
- 系统复杂度提高:在整套系统中硬生生的加入 MQ,对于保证没有消息重复消费的问题、对于保证消息传递顺序性问题、对于处理消息丢失的问题都比较困难
- 一致性问题:可能存在 A 系统处理完请求直接返回成功,但是在 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,造成数据不一致问题。
核心概念
- Server: 又称Broker, 接受客户端的连接,实现AMQP实体服务
- Connection: 连接,应用程序与Broker的网络连接
- Channel: 网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务
- Message: 消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则就是消息体内容
- Virtual host: 虚拟主机,用于进行逻辑隔离,就有点类似于NameSpace或Group的概念,是最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或Queue
- Exchange: 交换机,接收消息,根据路由键转发消息到绑定的队列
- Binding: Exchange和Queue之间的虚拟连接,binding中可以包含routing key
- Routing key: 一个路由规则,虚拟机可用它来确定如何路由一个特定消息
- Queue: 也称为Message Queue,消息队列,保存消息并将它们转发给消费者
工作模式
simple简单模式
- 消息产生消息,将消息放入队列。
- 消息的消费者(consumer)监听消息队列,如果队列中有消息就消费掉,消息被拿走后自动从队列中删除(隐患:消息可能没有被消费者正确处理,已经从队列中消失了造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。
work工作模式(资源的竞争)
- 消息生产者将消息放入队列消费者可以有多个,消费者1、消费者2同时监听同一个队列,消息被消费。
- C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize)保证一条消息只能被一个消费者使用)。
publish/subscribe发布订阅模式(共享资源)
- 每个消费者监听自己的队列。
- 生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
routing路由模式
- 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info)当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息。
- 根据业务功能定义路由字符串。
- 从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。
- 业务场景:error 通知、EXCEPTION、错误通知的功能、传统意义的错误通知、客户通知。利用key路由可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误。
topic主题模式(路由模式的一种)
- 星号井号代表通配符。
- 星号代表多个单词,井号代表一个单词。
- 路由功能添加模糊匹配。
- 消息产生者产生消息,把消息交给交换机。
- 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费(在我的理解看来就是routing查询的一种模糊匹配,就类似sql的模糊查询方式)。
上千万条消息在MQ中积压怎么解决?
产生背景:
- 生产者投递消息的速率与我们消费者消费的速率完全不匹配。
- 生产者投递消息的速率 > 消费者消费的速率,导致我们消息会堆积在服务器端中,如果没有及时的被消费者消费,就会产生消息堆积的问题。
注意的是:rabbitmq 消费者消息消费成功的话,消息会被立即删除。
解决方案:
方案一:通常的解决方案就是增加消费端实例。说白了就是增加机器。如果出现线上事故,能申请多少机器就申请多少机器,争取在最短的时间内消费掉积压在MQ中的消息。
方案二:如果申请机器行不通,毕竟公司的机器是有限的,此时可以增加消费端的消费能力。在MQ的配置中配置“最大消费者数量”与“每次从队列中获取的消息数量”。
方案三:如果还是不能解决问题的话,紧急上线专门用于记录消息的队列,先把MQ中的消息记录到数据库中,丢西无关紧要的消息,然后再慢慢的消化处理。