AMQP协议
Advanced Message Queuing Protocol,应用层高级消息队列协议,基于此协议的客户端与消息中间件可传递消息,不受客户端/中间件同产品、不同的开发语言等条件的限制。
RabbitMQ遵循了AMQP协议。
基本概念
Broker RabbitMQ服务器
Exchange 交换机,消息首先发送到指定exchange,然后按照routing key 路由到指定Queue
Queue 消息的载体,每条消息都会被投送到一个或多个队列中
Binding 绑定关系,将Exchange和Queue按照指定路由规则绑定起来
Routing 路由关键字,Exchange根据Routing Key进行消息投递
Vhost 一个Broker可以包含多个vhost,一个vhost包含一组Exchange、Queue和Binding等
Producer 生产者
Consumer 消费者
Connection Producer和Consumer与Broker之间的TCP长连接
Channel 信道,每个连接里可以建立多个Channel,每个Channel代表一个会话任务
交换机分类
Direct Exchange
直连交换机,与队列绑定时需要指定一个明确的binding key。发送消息到该交换机时,只有routing key跟binding key完全匹配,绑定的队列才能收到消息。
例如binding key为‘faith.account’,那么routing key 需也为‘faith.account’才能路由到指定队列。
Topic Exchange
主题交换机,与队列绑定时,可使用通配符定义routing key。
*代表匹配一个单词。#代表匹配零个或者多个单词。单词与单词之间用 . 隔开。
发送消息时,routing key符合binding key的模式时,绑定的队列才能收到消息。
例如:binding key 为‘#.faith.#’,那么routing key为‘faith.message’路由到该队列。
Fanout Exchange
广播交换机,与队列绑定时不需要指定binding key。
当消息发送到该类型的交换机时,所有与之绑定的队列都能收到消息。
死信
当消息:
消费者拒绝消息并且消息没有重新入队;
消息过期,通过设置消息的ttl(time to live)属性可以实现;
队列达到最大长度,此时最先入队的消息会被发送到DLX
时会进入DLX(Dead Letter Exchange)。
通过设置死信队列(Dead Letter Queue)与DLX绑定,可以接收死信,并通过监听该DLQ消费消息。
流控
参数设置
rabbitmq.config 文件中配置默认0.4的内存阈值,当rabbitmq占用内存超过40% 时会抛出内存警告并阻塞所有连接。
[{rabbit, [{vm_memory_high_watermark, 0.4}]}].
且默认剩余磁盘空间在 1GB 以下,也会主动阻塞所有的生产者。
消费端限流
设置prefetch值,例如1,表示当该消费者消费的消息有1条未被确认时,不进行新的消费。
prefetch没有默认值。如果没有设置,队列默认会把所有消息都发给消费者,在消费者没有ACK的情况下,发了多少就会产生多少Unacked。
如果prefetch是1,那么只要一条消息没有收到消费者的ACK,后续的消息都不会发送到这个消费者,造成消息堵塞。
可靠性投递
生产者到broker
这个阶段主要解决消息投递的可靠性,一般两种解决方案:Transaction(事务)模式和Confirm(确认)模式。
事务模式影响性能,一般使用confirm模式,消息正常到达exchange后会返回给生产者信息,以spring为例,开启需要如下配置:
<rabbit:connection-factory publisher-confirms="true" publisher-returns="true" />
<rabbit:template mandatory="true" />
publisher-confirms="true"
和publisher-returns="true"
,表示开启消息确认模式以及消息返回模式。
其次mandatory="true"
,表示如果exchange根据自身类型和消息routingKey无法路由到指定queue时,broker会调用basic.return方法将消息返还给生产者,当mandatory为false时,出现上述情况broker会直接将消息丢弃。
如果消息投递失败,则启用重复投递方式,例如投递5次,5次失败之后告警并存入DB中。
这里还可以使用try-catch,如代码执行失败,一样重试或存入DB。
需要注意的是事务机制和确认机制是互斥的。如果企图将已开启事务模式的信道再设置为publisher confirm模式,或者企图将已开启publisher confirm模式的信道设置为事务模式的话,RabbitMQ会报错:
cannot switch from tx to confirm mode
例如rabbitTemplate还设置了:channel-transacted="true"
会与确认机制配置发生冲突。
消息存储可靠性
宕机、重启、关闭等情况可能导致消息丢失。解决方案:
队列持久化
交换机持久化
消息持久化
集群,镜像队列
队列和交换机的持久化均可在对象声明时指定,消息的持久化可以在发送时指定。
消息消费时的可靠性
这一阶段可采用多种方式综合运用。
消息确认机制
使用消息确认机制(message acknowledgement),消费者订阅队列时可指定autoAck参数,当autoAck为false时,RabbitMQ会等待消费者显式地回复确认信号后才从队列中移去消息。
如果消息消费失败,也可以调用Basic.Reject或者Basic.Nack来拒绝当前消息而不是确认。如果requeue参数为true,该消息可重新入队,以便重发。
如果不确认,会导致prefetch数量+1,如果prefetch为1,则导致该消费者阻塞,不再收到broker推送的消息。
消费者回调
消费者处理消息以后,可以再发送一条消息给生产者,或者调用生产者的API,告知消息处理完毕。
例如支付中异步通信的回执,多次交互。如支付宝支付后会回调支付发起应用的回调函数,并有失败重发机制。
补偿机制
对于一定时间没有得到响应的消息,可以设置一个定时重发的机制,但要控制次数,比如最多重发3次,否则会造成消息堆积。
例如支付宝回调失败重发就是补偿机制的应用。
RabbitMQ集群
集群主要用于实现高可用与负载均衡。
RabbitMQ通过/var/lib/rabbitmq/.erlang.cookie来验证身份,需要在所有节点上保持一致。
集群有两种节点类型,一种是磁盘节点,一种是内存节点。集群中至少需要一个磁盘节点以实现元数据的持久化,未指定类型的情况下,默认为磁盘节点。
集群模式有两种,一种是普通模式,普通模式中的queue内的消息实体只存在于
其中一个节点,其余节点中仅有相同的队列的结构。例如集群节点为A和B,消息实体在A中。当consumer从B节点消费时,RabbitMQ会临时在A、B间进行消息传输,把A中的消息取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,防止消息都从一个出口出来。
当A故障后,B节点无法取到A节点中的消息实体。如果做了消息持久化,那么A节点恢复后才能被消费;否则消息丢失。
集群另一种模式是镜像模式,该模式和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在客户端取数据时临时拉取。但该模式副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的
网络带宽将会被这种同步通讯大大消耗,所以只在可靠性要求较高的场合中适用。
RabbitMQ镜像队列
集群方式下,队列和消息是无法在节点之间同步,因此需要使用RabbitMQ的镜像队列机制进行同步。
镜像队列机制能将queue镜像到cluster中其他的节点之上。如果集群中的一个节点失效了,queue能自动切换到镜像中的另一个节点以保证服务的可用性。
每个镜像队列都包含一个master和多个slave,分别对应于不同的节点。slave会按照master执行命令的顺序进行命令执行,故slave与master上维护的状态是相同的。除了publish外所有动作都只会向master发送,然后由master将命令执行的结果广播给slave们,故看似从镜像队列中的消费操作实际上是在master上执行的。
RabbitMQ的镜像队列支持publisher confirm和事务机制。事务机制中,只有当前事务在全部镜像queue中执行之后,客户端才会收到Tx.CommitOk的消息。同样的,在publisher confirm机制中,向publisher进行当前message确认的前提是该message被全部镜像所接受了。
镜像队列配置通过添加policy完成:
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
-p Vhost: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称
Pattern: queue的匹配模式(正则表达式)
Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes
all:表示在集群中所有的节点上进行镜像
exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
ha-params:ha-mode模式需要用到的参数
ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
priority:可选参数,policy的优先级
示例:
rabbitmqctl set_policy --priority 0 --apply-to queues mirror_queue "^queue_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
或通过web管理界面添加:
消息分发机制
Round-Robin(轮询)
默认的策略,消费者轮流、平均地收到消息。
Fair dispatch (公平分发)
根据消费者的处理能力来分发消息,可以用basicQos(int prefetch_count)来设置。
prefetch_count:当消费者有多少条消息没有响应ACK时,不再给这个消费者发送消息。