背景
利用RabbitMQ集群横向扩展能力,均衡流量压力,让消息集群的秒级服务能力达到百万,Google曾做过此类实验;有货在某些推送场景下也做了类似尝试,在此对此前实践经验以及踩得坑做些总结工作。
RabbitMQ概述
RabbitMQ是基于AMQP协议实现的消息中间件的一种,常用于在分布式系统中存储转发消息,表现为易用、可扩展、高可用等特点,最初活跃在金融系统中。消息中间件的主要作用是让服务组件之间能够解耦,消息生产者无需关注消息消费者的存在与行为。
1.Message:由Producer发出,经过Exchange路由到相应的Queue,然后Consumer从Queue中取走消费;
2.Queue:存储消息的容器,消息存储在队列里,直到有消费者连接队列并取走为止;
3.绑定(Binding):将Queue与Exchange之间按规则建立映射关系,类似建立网络路由表,通过Binding规定了Exchange如何将消息路由到某个队列中;
4.交换机(Exchange):Exchage就是路由器,每个消息都有一个路由Key的属性,交换机中有一些队列的Binding(路由规则),交换机有多种类型,如topic、direct、fanout;
5.Broker(服务器):接受客户端连接,实现AMQP消息队列和路由功能的进程;
6.虚拟主机(virtual-host):一个虚拟主机有一组交换机,队列和Binding,用户只能在虚拟主机的范围内进行权限控制,每一个服务器都有一个默认的虚拟主机(/);
7.连接(Connection):客户端与broker之间的Tcp连接;
8.信道(Channel):比连接更小的单位,创建连接后需要在其内创建信道发送消息,一个连接内可以有多个信道,这样设计是为了减少tcp连接,客户端线程尽量共用连接,不共用Channel;
RabbitMQ Brokers是一个或多个Erlang节点的逻辑分组,每个节点运行RabbitMQ应用程序并共享用户,虚拟主机,队列,交换,绑定和运行时参数。有时我们将节点的集合称为集群。在所有节点上复制RabbitMQ代理的操作所需的所有数据/状态。一个例外是消息队列,它们默认驻留在一个节点上,尽管它们是可见的,并且可以从所有节点访问。要跨集群中的节点复制队列,需要配置Mirror特性。
集群又可以分为两种,普通模式(默认模式)以两个节点(A、B)为例来进行说明。对于Queue来说,消息实体只存在于其中一个节点A(或者B),A和B两个节点仅有相同的元数据,即队列的结构。当消息进入A节点的Queue后,Consumer从B节点消费时,RabbitMQ会临时在A、B间进行消息传输,把从A中的消息实体取出并经过B发送给Consumer。所以Consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论Consumer连A或B,出口总在A,会产生瓶颈。当A节点故障后,B节点无法取到A节点中还未消费的消息实体。如果做了消息持久化,那么得等A节点恢复,然后才可被消费。如果没有持久化的话,就会产生消息丢失的现象。
把需要的队列做成镜像队列,队列存在与多个节点属于RabbitMQ的HA方案。该模式解决了普通模式中的问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在客户端取数据时临时拉取。该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。所以在对可靠性要求较高的场合中适用。
如何构建百万级消息服务
上文讲述了RabbitMQ的一些基础概念,接下来首先分析Google的测试思想,然后介绍下我们在此基础上的一些其他想法,借此了解下如何构建能够支持百万级消息并发的RabbitMQ服务。
Google共使用了32台8核30G内存的虚拟机,构建了相对来说比较庞大的rabbitmq集群,各虚拟机的作用分配如下:
30 RabbitMQ RAM节点(正常RAM节点,RabbitMQ元数据和定义仅保存在RAM中);1 RabbitMQ
Disc节点(元数据持久化节点,其中RabbitMQ代理元数据和定义也保留在光盘上);1 RabbitMQ Stats节点(统计信息节点,运行RabbitMQ管理插件,不带任何队列);
在这种高负载的生产(1345531msgs/pers)消费(1413840 msgs/pers)压力下,RabbitMQ仅有2343条消息暂时在其等待发送的队列中累积,在这样的负载下,RabbitMQ节点也没有显示内存压力,或者需要基于资源限制的启动流控机制。
使用RabbitMQ的许多用户现在大多集群规模大致为3-7个RabbitMQ节点组成的群集,从该类集群中就可以获得极好的结果。鉴于这一基础,在Google的实验中使用的30节点RabbitMQ集群与当前实践中常见的相比是相当大的。当然大数据的增长,物联网,实时分析等应用可能会增加未来许多RabbitMQ集群的规模。
在这样的大型集群中,系统的设计,构建和扩展都有一些要点。即使对于较小的集群也是如此,首先,RabbitMQ的消息传递工作的基本并行单位是队列。除了适用于某些高可用性配置的部分异常之外,RabbitMQ队列由单个Erlang进程(轻量级线程抽象)支持,通过谨慎地分配消息的生产者,相对于他们的消息最终到达的队列,可以解决单个队列所构成的潜在瓶颈。Pivotal RabbitMQ教程演示了支持各种场景和路由方案的消息架构的构建。Google使用了非常基本的例子。当然除了教程中涵盖的场景之外,RabbitMQ还存在更多的可能性,包括使用一致的哈希交换类型进行动态负载平衡场景。
其次,重要的是要注意个别节点的职责,尤其在负载非常高的集群中。可以在群集中的任何节点上启用或禁用RabbitMQ管理插件。RabbitMQ管理插件提供上述基于Web的管理UI,以及相应的基于HTTP的管理API,还可以作为统计其他集群节点报告性能指标。在大型集群中,许多节点都是报告度量,目前统计数据库都可能成为瓶颈。因此,Google在实验过程中,单独创建了一个信息统计节点,并将其从负载均衡器的后端服务器列表中排除掉,从而消息生产与消费不会经过该节点,统计信息与生产消费也就不会发生竞争资源的情况。
在AWS上使用同等规模与配置的环境,验证了Google提供的测试结果后,又做了一些别的尝试,如使用RabbitMQ Sharding插件、Consistent-hash Sharding Exchange来更加灵活的动态均衡队列压力,同样可以达到此类性能。
RabbitMQ Sharding插件
下面介绍下如何使Sharding插件,3.6.0以及以后的RabbitMQ版本启用Sharding插件,使用命令:
rabbitmq-pluginsenable rabbitmq_sharding
在大规模集群上,配置节点多分片队列,可以有效分摊单队列的性能瓶颈。这个插件能够让分片队列自动扩展,如果您添加更多的节点到您的RabbitMQ群集,那么该插件将自动在新节点中创建更多的分片。假设集群初始仅有一个节点A,配置每个节点分布4个分片队列,现在将节点B加入了节点A所在群集。插件将自动在节点b中创建4个队列,并将它们连接到分片分区。已经传递的消息将不会被重新平衡,但新到达的消息将被分区到新的队列。
默认情况下RabbitMQ的交换机以”all or nothing”方式工作,即:如果路由key与绑定到交换机的一组队列匹配,则RabbitMQ将将消息路由到该集合中的所有队列。因此,为了使这个插件能正常工作,我们需要将消息路由到一个交换机来分配消息,让消息最多被分配到一个队列。该插件提供了一种新的Exchange类型“x-modulus-hash”,它将使用传统的哈希技术应用于跨队列分区消息。“x-modulus-hash”交换机将对用于发布消息的Routing-Key进行hash,然后将hash值mod N来选择路由消息的队列,其中N是绑定到交换机的队列数。此交换将完全忽略用于将队列绑定到交换机的Routing-Key。如果只需要消息分区,而不是由此插件提供的自动队列创建,那么只需使用一致的哈希Exchange,这个后面介绍。
安装插件后,您可以通过设置与交换名称匹配的策略来定义交换分片。例如,如果我们有一个称为shard.images的交换,我们可以定义以下策略来分片:set_policy images-swarding "^images"
这将为集群中的每个节点创建2个分片队列,并使用Routing
key:”hello”对这些队列进行绑定,随后定义一个名为images的exchange。
随后,插件会自动在每个节点上创建2个分片队列,名为“sharding:images-*”。
在上面的例子中,我们在定义策略时使用路由key为“hello”。这意味着用于分片的底层交换机将使用上面指定的hello路由key将分片队列绑定到交换机。这意味着对于“Direct-Exchange”,使用路由密钥hello发布的息将被路由到所有的分片队列。如果您决定使用“Fanout-exchange”进行分片,则在绑定期间使用的“hello”路由key将被交换机忽略。如果使用“x-modulus-hash”交换,则路由key也将被忽略。因此,根据您使用的交换机,路由策略定义在路由消息时会产生影响。
Consistent-sharding Exchange
在某些情况下,你可能希望发送到交换机的消息是一致和均匀地分布在多个不同的队列。在上面的插件中如果队列数量发生变化,则不难确保新的拓扑结构仍然在不同队列之间均匀分配消息,此时就可以借助Consistent-sharding类型Exchange,与Sharding插件的主要区别是,该类Exchange不能自动创建分片队列,需要手动创建并配置Binding关系,且支持一致性hash。
在作为交换类型的一致哈希的情况下,从所接收的每个消息的Routing-key进行哈希计算后散列存储。因此,具有相同Routing-Key的消息将具有计算的相同散列,将被路由到相同的队列。将队列绑定到一致性哈希的Exchange时,绑定key是一个数字字符串,表示希望该队列在整个hash空间中占有的点数。具有相同Routing-key的所有消息将进入相同的队列。因此,如果希望队列A接收路由消息是队列B接收路由消息的两倍,那么需要将队列A绑定到Exchange的绑定Key(字符串的数字)设置为队列B的绑定Key的2倍。当然,只有当你的路由Key均匀分布在散列空间中时才是这种情况。例如,如果在所有消息上仅使用两个不同的路由Key,即使其他队列在其绑定Key中具有较高的值,两个密钥也可能路由到同一个队列。使用更大的路由Key集合,路由Key的统计分布接近绑定Key的设置的比率。
在这种情况下,随机Routing-key的消息最终将会均匀分布到两个队列中。
RabbitMQ可靠性与可用性讨论
但是如何确保消息传递的可靠性以及如何配置高可用,很多人都一直存在疑惑,实践才是检验真理的唯一标准,所以基于有货某些使用场景,也分析总结下经验与教训,希望对大家有所帮助。
场景1,如何保证消息的传递可靠,生产者与消费者互不感知,那么怎么确认生产者已将消息投递到RabbitMQ服务端,又如何确认消费者已经消费了该消息?这里需要使用的RabbtiMQ提供的生产者Confirm机制、消费者Ack机制来解决;
使用标准AMQP 0-9-1,保证消息不丢失的唯一方法是使用事务:使信道事务发布,发布消息,提交。在这种情况下,交易是不必要的重量级,并将吞吐量降低250倍。为了弥补这一点,引入了确认机制。它模仿了协议中已经存在的消费者确认机制。
要启用确认,客户端发送confirm.select方法。根据是否设置不等待,RabbitMQ Broker可以通过confirm.select-ok进行回复。一旦在通道上使用了confirm.select方法,就被认为处于确认模式。事务通道不能进入确认模式,一旦通道处于确认模式,则不能进行事务处理。
一旦通道处于确认模式,代理和客户端都会计数消息(从第一个confirm.select开始计数)。然后Broker通过在同一个频道上发送basic.ack来确认消息。发送标签字段包含已确认消息的序列号。Broker还可以在basic.ack中设置多个字段,表明所有小于该序列号的消息都已到达并被处理。
何时确认呢?对于无法路由的消息,一旦exchange验证了消息不会被路由到任何队列(返回一个空列表的队列),Broker将发出确认。如果消息的发送属性指定了为强制性(mandatory),则basic.return将在basic.ack之前发送给客户端。否定的确认也是如此(basic.nack)。
对于可以路由的消息,当所有队列接受消息时,发送basic.ack。对于路由到持久队列的持久消息,这意味着已保存到磁盘。对于镜像队列,这意味着队列的所有镜像都已接受该消息。
当RabbitMQ交付消息给Consumer时,需要确认Message已被投递到Consumer。Acknowledgemenets作用,consumer通知server已收到消息或者成功消费消息,根据使用的确认模式,RabbitMQ可以在发送(写入TCP套接字)后或当接收到显式(“手动”)客户端确认信息时立即考虑成功传递的消息。手动发送的确认可以是正面或负面的,并使用以下协议方法之一:
basic.ack用于肯定确认;basic.nack用于否定确认(注意:这是AMQP
0-9-1的RabbitMQ扩展);basic.reject用于否定确认,但与basic.nack相比有一个限制;肯定的确认只是指示RabbitMQ记录一个消息传递。与basic.reject的否定确认具有相同的效果。差异主要在语义上:肯定确认假设一条消息已成功处理,而其负面对应方则表示传送未被处理但仍应被删除,可以批量手动确认以减少网络流量。
综上所述,在1的位置需要开启Channel的Confirm模式,接收RabbitMQ服务端发送的确认消息已到达的Ack信息;在3的位置,消费者在成功消费或者业务处理失败后,需要显示告诉RabbitMQ服务端,消息已被消费成功或者失败;
在某些类型的网络故障中,数据包丢失可能意味着中断的TCP连接需要较长时间才能够被操作系统检测到。AMQP 0.9.1提供心跳功能,以确保应用程序层及时发现连接中断。在我们的部署架构中,ELB与RabbitMQ之间就是通过此机制来判断服务是否存活,是否提示生产者服务端已挂,异步等待confirm的消息直接进入unconfirm的处理环节。
另外为了避免在代理中丢失消息,我们需要应对代理重新启动,代理硬件故障,甚至破坏代理崩溃。为了确保重新启动时消息和代理定义生效,我们需要确保它们在磁盘上持久化。AMQP标准具有交换,队列和持久消息的耐久性概念,要求持久对象或持久消息将在重新启动后生存。
场景2,如何实现处理失败后重试机制;
某些情况下,业务在处理消息时可能会失败,此时需要做的是重试,而不是直接丢弃;当然重试也不能仅仅是直接重试,一旦有任务长时间失败,会导致后面的消息无法被正常处理,此时可以借助死信机制转发投递到重试队列后,随后再尝试重新处理该消息;
那如何实现呢?下面介绍下具体操作;
x-dead-letter-exchange:死信转发的exchange;x-dead-letter-routing-key:死信转发时的routing-key;该队列绑定到名为“amp.topic”的topic类型exchange,接收routing-key为“yoho_test_retry”的消息;
死信转发到“amp.topic”的exchange,routing-key为“yoho_test_retry”(即工作队列接收该主题消息);x-message-ttl:message在重试队列中存活的时间,也就是延迟多久重试;该队列绑定到“amp.topic”,接收routing-key为“retry.yoho_test_retry”的消息(即接收工作队列的死信),这样就可以实现重试队列的机制了。
场景3,如何实现定时任务;
定时任务,这也是一种常见的需求,那如何在RabbitMQ中实现这个能力,可以让某些任务延时执行。其实同样的也可以借助死信机制来实现,如队列A用于接收暂存Producer的消息,队列B用于Consumer的消费,在队列A中指定消息的ttl即生命周期时长,同时指定其死信交换机DLXs,一旦消息在队列中存活时长超过ttl的设定值,那么消息会被转发到DLXs,绑定队列B到DLXs,即可接收到队列A的死信;
下面具体看下操作流程;
从“amp.topic”的exchange中接收routing-key为“delay.yoho_test_delay”主题的消息;
死信转到交换机“amp.topic”中,消息的routing-key为“delay.yoho_test_delay”(即工作队列接收的延迟消息的队列),消息在延迟队列中存活时间ttl,同时该队列绑定到“amp.topic”交换机,接收routing-key为“yoho_test_delay”注意的消息(即生产发送消息指定的topic);如此一来延迟队列接收消息后,等待ttl时长,将消息转发到工作队列中,即可实现延迟队列机制;
场景4,如何跨中心共享消息;
有时跨中心业务需要共享消息,如缓存清理等,在业务代码中分别向多个中心的RabbitMQ发布消费消息显然不是一种比较好的解决方案,那还有什么好的方法呢,RabbitMQ为此提供了Federation插件来很好的解决此类问题;
Federation插件是一个在不需要cluster,而在brokers之间传输消息的高性能插件,Federation插件可以在brokers或者cluster之间传输消息,连接的双方可以使用不同的users和virtual hosts,或者双方的rabbitmq和erlang的版本不一致,Federation插件使用AMQP协议通讯,可以接受不连续的传输。
Federation插件允许你配置一个exchanges Federation或者queues Federation。一个exchange/queues federation允许你从一个或者多个upstream接受信息(就是远程的exchange/queues或者其他brokers),一个federation exchange可以路由消息到一个本地queue中,一个federation queue可以使一个本地的consumer接受从upstream queue过来的消息。下面分别看下如何启用,创建以及两者的区别:
(1)启动Federation插件命令:
rabbitmq-pluginsenable rabbitmq_federation
(2)启动web配置插件:
rabbitmq-pluginsenable rabbitmq_federation_management
Federation exchanges,可以看成downstream从upstream主动拉取消息,但并不是拉取所有消息,必须是在downstream上已经明确定义Bindings关系的exchange,也就是有实际的物理queue来接收消息,才会从upstream拉取消息到downstream。使用AMQP协议实施代理间通信。Downstream会将绑定关系组合在一起,绑定/解除绑定命令将发送到upstream交换机。因此,Federation交换机只接收具有订阅的消息。
任何upstream exchange接收到的消息都可能被downstream中Federation exchange接收到,但直接发送给Federation Exchange的消息是不能upstream中所绑定的exchange接收到的。
Federation queues与Federationexchange最大的区别共享消息的机制不同,Federation队列仅在本地消耗消息时检索消息,消费者需要消息,并且Upstream队列具有未被消费的消息。这样做的目的是确保消息仅在需要时,才会在联合队列之间传输。
先看下创建过程,如下创建一个名为“fqueue_test”的federation
Federation队列可以被声明为任何其他队列。为了使RabbitMQ能够识别出队列需要联合,还有哪些节点消息应该被消耗,Downstream(消费)节点需要进行配置。
通过声明策略来完成配置。策略是队列名称匹配的模式。匹配队列将联合。Federation队列只能属于一个策略。如果多个策略与队列名称匹配,则应用优先级最高的策略。当两个策略具有相同的优先级时,随机选择匹配的策略。
Federation队列可以作为另一个Federation队列的“上游”,甚至可以形成“循环”,例如,队列A将队列B声明为上游,队列B将队列A声明为上游。允许更复杂的多重连接的安排。Federation队列将使用AMQP连接到其所有上游队列。在声明或配置联合队列时,每个上游队列都将列出用于建立链接的连接属性。
每个单独的队列分别应用其参数,例如,如果在联合队列上设置x-max-length,则该队列的长度将受限制(可能会在其已满时丢弃消息),但与其联合的其他队列将不受影响。特别要注意的是,当每个队列或每个消息的TTL被使用时,当一个消息被传送到另一个队列时,它的定时器将被重置。
与Federation交换机不同,在Federation队列之间可以转发消息的次数没有限制。在一组相互联合的队列中,消息将移动到空闲消费容量的位置。因此,如果闲置消费容量继续移动,消息将继续移动。上述就是Federation的使用方法以及注意点,当然与其他插件的配合,可以衍生出多种使用方法。
场景5,如何保证消息队列的高可用,这样的场景很多比如核心业务的订单服务、erp服务等等。
默认情况下,RabbitMQ群集中的队列位于单个节点(首次被声明的节点上),而Exchanges和Bindings可以认为在所有节点上存在,可以选择在cluster中跨节点节点之间配置为镜像队列。每个镜像队列由一个master和一个或多个slave组成,如果master因为某些原因失效,则将从slave中选择一个提升为master。
发布到队列的消息将复制到所有镜像。消费者连接到主机,无论它们连接到哪个节点,镜像会丢弃已在主设备上确认的消息。队列镜像因此增强了可用性,但不跨节点分配负载(所有参与节点都执行所有工作)。
队列已被配置为镜像,master节点位于server5,slave节点位于server6,此时,随意关闭任意一台RabbitMQ节点,该队列都可以正常对外提供服务。
A先挂,B后挂,master转移B,此时需先拉起B,后拉起A,可恢复镜像队列;
A、B同时挂,同时拉起,可恢复镜像队列;
若新加入cluster节点,最好不要在生产环境手工同步,采用自然同步方式,对于没有工作queue可以手工同步操作(同步操作时,queue是不可用的);当所有slave都处在(与master)未同步状态时,并且ha-promote-on-shutdown policy设置为when-syned(默认)时,如果master因为主动的原因停掉,比如是通过rabbitmqctl stop命令停止或者优雅关闭OS,那么slave不会接管master,也就是说此时镜像队列不可用;但是如果master因为被动原因停掉,比如VM或者OS crash了,那么slave会接管master。这个配置项隐含的价值取向是优先保证消息可靠不丢失,放弃可用性。如果ha-promote-on-shutdown policy设置为alway,那么不论master因为何种原因停止,slave都会接管master,优先保证可用性。
A、B都未挂,两者网络异常,各自为master,此时出现网络分区冲突,必须手工介入保证消息不丢失,万不可随意重启导致数据丢失(不论是否持久化),将一台数据量较小的从cluster中剔除,消费完成后再重启恢复镜像;或者将其中一台从集群中剔除后,加入另外一台slave,再消费完成剔除的节点中数据;(会出现重复消费,此时需要客户端做幂等处理保证唯一一次消费)
当然在高可用的场景下,队列的性能会受到一定的影响,此时可以借助上面提到的Sharding机制(根据场景选择x-modulus-hash还是consistent-hash),解决单队列的性能瓶颈,在高可用、高并发下寻求一个动态的平衡;
上图为镜像场景的压测结果,对比普通集群,镜像对性能的影响很明显,消息持久化也拉低了集群的性能,适当增加Prefetch可以提高集群性能。
性能与高可靠、高可用,鱼和熊掌不可兼得,所以欲提升RabbitMQ集群或单节点服务的性能,牺牲可靠性(根据场景来),在消费能力范围内,尽量提高prefetch的数量,其次就是简单粗暴型(加机器(队列实际存储节点性能未榨干,建议队列均衡分配到各节点)、加配置)。
Spring AMQP提供了一个API,可轻松访问AMQP消息代理。像往常一样,Spring模板作为技术细节的抽象。对于AMQP,AmqpTemplate可以做到这一点。
Spring-amqp项目拥有所有必要的通用接口(例如AmqpTemplate)和API类,而具体的实现则依赖spring-rabbitmq,Spring-rabbitmq依赖于RabbitMQ amqp-client的通用Java API。客户端应用程序仅依靠spring-amqp来实现松耦合。能够从一个AMQP代理切换到另一个AMQP代理,而不会在代码中进行任何重大更改。