rabbitmq

简介: AMQP协议 Advanced Message Queuing Protocol,应用层高级消息队列协议,基于此协议的客户端与消息中间件可传递消息,不受客户端/中间件同产品、不同的开发语言等条件的限制。

AMQP协议

Advanced Message Queuing Protocol,应用层高级消息队列协议,基于此协议的客户端与消息中间件可传递消息,不受客户端/中间件同产品、不同的开发语言等条件的限制。

RabbitMQ遵循了AMQP协议。

基本概念

1

Broker RabbitMQ服务器

Exchange 交换机,消息首先发送到指定exchange,然后按照routing key 路由到指定Queue

Queue 消息的载体,每条消息都会被投送到一个或多个队列中

Binding 绑定关系,将Exchange和Queue按照指定路由规则绑定起来

Routing 路由关键字,Exchange根据Routing Key进行消息投递

Vhost 一个Broker可以包含多个vhost,一个vhost包含一组Exchange、Queue和Binding等

Producer 生产者

Consumer 消费者

Connection Producer和Consumer与Broker之间的TCP长连接

Channel 信道,每个连接里可以建立多个Channel,每个Channel代表一个会话任务

交换机分类

Direct Exchange

直连交换机,与队列绑定时需要指定一个明确的binding key。发送消息到该交换机时,只有routing key跟binding key完全匹配,绑定的队列才能收到消息。

例如binding key为‘faith.account’,那么routing key 需也为‘faith.account’才能路由到指定队列。

Topic Exchange

主题交换机,与队列绑定时,可使用通配符定义routing key。
*代表匹配一个单词。#代表匹配零个或者多个单词。单词与单词之间用 . 隔开。

发送消息时,routing key符合binding key的模式时,绑定的队列才能收到消息。

例如:binding key 为‘#.faith.#’,那么routing key为‘faith.message’路由到该队列。

Fanout Exchange

广播交换机,与队列绑定时不需要指定binding key。

当消息发送到该类型的交换机时,所有与之绑定的队列都能收到消息。

死信

当消息:

消费者拒绝消息并且消息没有重新入队;

消息过期,通过设置消息的ttl(time to live)属性可以实现;

队列达到最大长度,此时最先入队的消息会被发送到DLX

时会进入DLX(Dead Letter Exchange)。

通过设置死信队列(Dead Letter Queue)与DLX绑定,可以接收死信,并通过监听该DLQ消费消息。

流控

参数设置

rabbitmq.config 文件中配置默认0.4的内存阈值,当rabbitmq占用内存超过40% 时会抛出内存警告并阻塞所有连接。

[{rabbit, [{vm_memory_high_watermark, 0.4}]}].

且默认剩余磁盘空间在 1GB 以下,也会主动阻塞所有的生产者。

消费端限流

设置prefetch值,例如1,表示当该消费者消费的消息有1条未被确认时,不进行新的消费。

prefetch没有默认值。如果没有设置,队列默认会把所有消息都发给消费者,在消费者没有ACK的情况下,发了多少就会产生多少Unacked。

如果prefetch是1,那么只要一条消息没有收到消费者的ACK,后续的消息都不会发送到这个消费者,造成消息堵塞。

可靠性投递

2

生产者到broker

这个阶段主要解决消息投递的可靠性,一般两种解决方案:Transaction(事务)模式和Confirm(确认)模式。

事务模式影响性能,一般使用confirm模式,消息正常到达exchange后会返回给生产者信息,以spring为例,开启需要如下配置:

<rabbit:connection-factory publisher-confirms="true" publisher-returns="true" />

<rabbit:template mandatory="true" />

publisher-confirms="true"publisher-returns="true",表示开启消息确认模式以及消息返回模式。

其次mandatory="true",表示如果exchange根据自身类型和消息routingKey无法路由到指定queue时,broker会调用basic.return方法将消息返还给生产者,当mandatory为false时,出现上述情况broker会直接将消息丢弃。

如果消息投递失败,则启用重复投递方式,例如投递5次,5次失败之后告警并存入DB中。

这里还可以使用try-catch,如代码执行失败,一样重试或存入DB。

需要注意的是事务机制和确认机制是互斥的。如果企图将已开启事务模式的信道再设置为publisher confirm模式,或者企图将已开启publisher confirm模式的信道设置为事务模式的话,RabbitMQ会报错:

cannot switch from tx to confirm mode

例如rabbitTemplate还设置了:channel-transacted="true"会与确认机制配置发生冲突。

消息存储可靠性

宕机、重启、关闭等情况可能导致消息丢失。解决方案:

队列持久化

交换机持久化

消息持久化

集群,镜像队列

队列和交换机的持久化均可在对象声明时指定,消息的持久化可以在发送时指定。

消息消费时的可靠性

这一阶段可采用多种方式综合运用。

消息确认机制

使用消息确认机制(message acknowledgement),消费者订阅队列时可指定autoAck参数,当autoAck为false时,RabbitMQ会等待消费者显式地回复确认信号后才从队列中移去消息。

如果消息消费失败,也可以调用Basic.Reject或者Basic.Nack来拒绝当前消息而不是确认。如果requeue参数为true,该消息可重新入队,以便重发。

如果不确认,会导致prefetch数量+1,如果prefetch为1,则导致该消费者阻塞,不再收到broker推送的消息。

消费者回调

消费者处理消息以后,可以再发送一条消息给生产者,或者调用生产者的API,告知消息处理完毕。
例如支付中异步通信的回执,多次交互。如支付宝支付后会回调支付发起应用的回调函数,并有失败重发机制。

补偿机制

对于一定时间没有得到响应的消息,可以设置一个定时重发的机制,但要控制次数,比如最多重发3次,否则会造成消息堆积。

例如支付宝回调失败重发就是补偿机制的应用。

RabbitMQ集群

集群主要用于实现高可用与负载均衡。
RabbitMQ通过/var/lib/rabbitmq/.erlang.cookie来验证身份,需要在所有节点上保持一致。

集群有两种节点类型,一种是磁盘节点,一种是内存节点。集群中至少需要一个磁盘节点以实现元数据的持久化,未指定类型的情况下,默认为磁盘节点。

集群模式有两种,一种是普通模式,普通模式中的queue内的消息实体只存在于
其中一个节点,其余节点中仅有相同的队列的结构。例如集群节点为A和B,消息实体在A中。当consumer从B节点消费时,RabbitMQ会临时在A、B间进行消息传输,把A中的消息取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,防止消息都从一个出口出来。

当A故障后,B节点无法取到A节点中的消息实体。如果做了消息持久化,那么A节点恢复后才能被消费;否则消息丢失。

集群另一种模式是镜像模式,该模式和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在客户端取数据时临时拉取。但该模式副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的
网络带宽将会被这种同步通讯大大消耗,所以只在可靠性要求较高的场合中适用。

RabbitMQ镜像队列

集群方式下,队列和消息是无法在节点之间同步,因此需要使用RabbitMQ的镜像队列机制进行同步。

镜像队列机制能将queue镜像到cluster中其他的节点之上。如果集群中的一个节点失效了,queue能自动切换到镜像中的另一个节点以保证服务的可用性。

每个镜像队列都包含一个master和多个slave,分别对应于不同的节点。slave会按照master执行命令的顺序进行命令执行,故slave与master上维护的状态是相同的。除了publish外所有动作都只会向master发送,然后由master将命令执行的结果广播给slave们,故看似从镜像队列中的消费操作实际上是在master上执行的。

RabbitMQ的镜像队列支持publisher confirm和事务机制。事务机制中,只有当前事务在全部镜像queue中执行之后,客户端才会收到Tx.CommitOk的消息。同样的,在publisher confirm机制中,向publisher进行当前message确认的前提是该message被全部镜像所接受了。

镜像队列配置通过添加policy完成:

rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

-p Vhost: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称
Pattern: queue的匹配模式(正则表达式)
Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
    ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes
        all:表示在集群中所有的节点上进行镜像
        exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
        nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
    ha-params:ha-mode模式需要用到的参数
    ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
priority:可选参数,policy的优先级

示例:

rabbitmqctl set_policy --priority 0 --apply-to queues mirror_queue "^queue_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

或通过web管理界面添加:

3

消息分发机制

Round-Robin(轮询)

默认的策略,消费者轮流、平均地收到消息。

Fair dispatch (公平分发)

根据消费者的处理能力来分发消息,可以用basicQos(int prefetch_count)来设置。

prefetch_count:当消费者有多少条消息没有响应ACK时,不再给这个消费者发送消息。
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
1月前
|
消息中间件 存储 负载均衡
|
6月前
|
消息中间件 存储 数据库
RabbitMQ特殊应用
RabbitMQ特殊应用
35 0
|
9月前
|
消息中间件 存储 网络协议
rabbitmq的介绍
rabbitMQ是一个开源的AMQP实现的消息队列中间件,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、C、 用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错,与SpringAMQP完美的整合、API丰富易用。
|
9月前
|
消息中间件 存储 缓存
RabbitMQ到底为什么要使用它?
在多服务体系架构中,必然存在着多个服务之间的调用关系,当用户提交了订单,订单服务会调用支付服务执行用户的金钱操作,执行完毕之后紧接着调用商品服务对商家的商品信息(库存、成交量、收入等)进行更新,执行完毕之后又调用物流服务
|
10月前
|
消息中间件 存储 JSON
关于RabbitMQ
MQ是一种应用程序键一步通讯的技术,MQ是消息队列的缩写(Message Queue) 在MQ中,消息由一个应用程序发送到一个称为队列的中间件中,接着被中间件存储,并最终被另一个或多个消费者应用程序读取和处理; MQ组成:消息——生产者——队列——中间件——消费者!
54 0
|
10月前
|
消息中间件 Java
RabbitMQ(2)
RabbitMQ(2)
|
10月前
|
消息中间件 存储 缓存
RabbitMQ中的SpringAMQP(上)
RabbitMQ中的SpringAMQP(上)
113 0
|
10月前
|
消息中间件 JSON 缓存
RabbitMQ中的SpringAMQP(下)
RabbitMQ中的SpringAMQP(下)
105 0
|
10月前
|
消息中间件 存储 缓存
初识RabbitMQ
初识RabbitMQ
84 1
|
11月前
|
消息中间件 存储 网络协议
二、初识 RabbitMQ
二、初识 RabbitMQ

相关实验场景

更多