MQ - 09 RabbitMQ的架构设计与实现

简介: MQ - 09 RabbitMQ的架构设计与实现

导图


概述

最基础的消息队列应该具备通信协议、网络模块、存储模块、生产者、消费者五个模块。

接下来我们从消息和流的角度,分别看一下

  • 消息方向的消息队列 RabbitMQ、RocketMQ
  • 流方向的消息队列 Kafka、Pulsar

在这五个模块的实现思路和设计思想 。

今天先看看 RabbitMQ。


RabbitMQ 系统架构

我们先来看一下 RabbitMQ 的系统架构。

如上图所示,RabbitMQ 由 Producer、Broker、Consumer 三个大模块组成。生产者将数据发送到 Broker,Broker 接收到数据后,将数据存储到对应的 Queue 里面,消费者从不同的 Queue 消费数据。

那么除了 Producer、Broker、Queue、Consumer、ACK 这几个消息队列的基本概念外,它还有 Exchange、Bind、Route 这几个独有的概念。下面来简单解释下。

Exchange 称为交换器,它是一个逻辑上的概念,用来做分发,本身不存储数据。流程上生产者先将消息发送到 Exchange,而不是发送到数据的实际存储单元 Queue 里面。

然后 Exchange 会根据一定的规则将数据分发到实际的 Queue 里面存储。这个分发过程就是 Route(路由),设置路由规则的过程就是 Bind(绑定)。

即 Exchange 会接收客户端发送过来的 route_key,然后根据不同的路由规则,将数据发送到不同的 Queue 里面。

这里需要注意的是,在 RabbitMQ 中是没有 Topic 这个用来组织分区的逻辑概念的。RabbitMQ 中的 Topic 是指 Topic 路由模式,是一种路由模式,和消息队列中的 Topic 意义是完全不同的

那为什么 RabbitMQ 会有 Exchange、Bind、Route 这些独有的概念呢?

在我看来,主要和当时业界的架构设计思想以及主导设计 AMQP 协议的公司背景有关。当时的设计思路是:希望发消息跟写信的流程一样,可以有一个集中的分发点(邮局),通过填写好地址信息,最终将信投递到目的地。这个集中分发点(邮局)就是 Exchange,地址信息就是 Route,填写地址信息的操作就是 Bind,目的地是 Queue。

讲清楚基本概念和架构,我们就围绕着前面提到的五个模块来分析一下 RabbitMQ,先来看一下协议和网络模块


协议和网络模块

网络通信协议层面,RabbitMQ 数据流是基于四层 TCP 协议通信的,跑在 TCP 上的应用层协议是 AMQP。

如果开启 Management 插件,也可以支持 HTTP 协议的生产和消费。TCP + AMQP 是数据流的默认访问方式,也是官方推荐的使用方式,因为它性能会比 HTTP 高很多。

RabbitMQ 在协议内容和连接管理方面,都是遵循 AMQP 规范。即 RabbitMQ 的模型架构和 AMQP 的模型架构是一样的,交换器、交换器类型、队列、绑定、路由键等都是遵循 AMQP 协议中相应的概念

AMQP 是一个应用层的通信协议,可以看作一系列结构化命令的集合,用来填充 TCP 层协议的 body 部分。通过协议命令进行交互,可以完成各种消息队列的基本操作,如 Connection.Start(建立连接)、Basic.Publish(发送消息)等等,详细的 AMQP 协议内容可以参考文档 AMQP Working Group 1.0 Final

下面是一张生产消息流程的协议命令交互图,大概包含了建立连接、发送消息、关闭连接三个步骤。


讲完了协议,我们来看看网络模块。

先来看下面这张图,在 RabbitMQ 的网络层有 Connectoion 和 Channel 两个概念需要关注。

Connection 是指 TCP 连接,Channel 是 Connection 中的虚拟连接。两者的关系是:

  • 一个客户端和一个 Broker 之间只会建立一条 TCP 连接,就是指 Connection。
  • Channel(虚拟连接)的概念在这个连接中定义,一个 Connection 中可以创建多个 Channel。

客户端和服务端的实际通信都是在 Channel 维度通信的。这个机制可以减少实际的 TCP 连接数量,从而降低网络模块的损耗。从设计角度看,也是基于 IO 复用、异步 I/O 的思路来设计的。

从编码实现的角度,RabbitMQ 的网络模块设计会比较简单。主要包含 tcp_listener、tcp_acceptor、rabbit_reader 三个进程。如下图所示,RabbitMQ 服务端通过

  • tcp_listener 监听端口,
  • tcp_acceptor 接收请求,
  • rabbit_reader 处理和返回请求。

本质上来看是也是一个多线程的网络模型。


数据存储

接下来我们看看 RabbitMQ 的存储模块。

RabbitMQ 的存储模块也包含元数据存储与消息数据存储两部分。如下图所示,RabbitMQ 的两类数据都是存储在 Broker 节点上的,不会依赖第三方存储引擎。

我们先来看一下元数据存储。

元数据存储 —> 自带的分布式数据库 Mnesia

RabbitMQ 的元数据都是存在于 Erlang 自带的分布式数据库 Mnesia 中的。即每台 Broker 都会起一个 Mnesia 进程,用来保存一份完整的元数据信息。因为 Mnesia 本身是一个分布式的数据库,自带了多节点的 Mnesia 数据库之间的同步机制。所以在元数据的存储模块,RabbitMQ 的 Broker 只需要调用本地的 Mnesia 接口保存、变更数据即可。不同节点的元数据同步 Mnesia 会自动完成。

Mnesia 对 RabbitMQ 的作用,相当于 ZooKeeper 对于 Kafka、NameServer 对于 RocketMQ 的作用。因为 Mnesia 是内置在 Broker 中,所以部署 RabbitMQ 集群时,你会发现只需要部署 Broker,不需要部署其他的组件。这种部署结构就很简单清晰,从而也降低了后续的运维运营成本。

在一些异常的情况下,如果不同节点上的 Mnesia 之间的数据同步出现问题,就会导致不同的 Mnesia 数据库之间数据不一致,进而导致集群出现脑裂、无法启动等情况。此时就需要手动修复异常的 Mnesia 实例上的数据。

因为 Mnesia 本身是一个数据库,所以它和数据库一样,可以进行增删改查的操作。需要了解 Mnesia 的更多操作,你可以参考 ErLang Mnesia


消息数据存储

如下图所示,RabbitMQ 消息数据的最小存储单元是 Queue,即消息数据是按顺序写入存储到 Queue 里面的。在底层的数据存储方面,所有的 Queue 数据是存储在同一个“文件”里面的。这个“文件”是一个虚拟的概念,表示所有的 Queue 数据是存储在一起的意思。

这个“文件”由队列索引(rabbit_queue_index)和消息存储(rabbitmq_msg_store)两部分组成。即在节点维度,所有 Queue 数据都是存储在 rabbit_msg_store 里面的,每个节点上只有一个 rabbit_msg_store,数据会依次顺序写入到 rabbit_msg_store 中。

rabbit_msg_store 是一个逻辑概念,底层的实际存储单元分为两个,msg_store_persistent 和 msg_store_transient,分别负责持久化消息和非持久化消息的存储。

msg_store_persistent 和 msg_store_transient 在操作系统上是以文件夹的形式表示的,具体的数据存储是以不同的文件段的形式存储在目录中,所有消息都会以追加的形式写入到文件中。

当一个文件的大小超过了配置的单个文件的最大值,就会关闭这个文件,然后再创建一个文件来存储数据。关于 RabbitMQ 底层的数据存储结构,如下图所示:

队列索引负责存储、维护队列中落盘消息的信息,包括消息的存储位置、是否交付、是否 ACK 等等信息。队列索引是 Queue 维度的,每个 Queue 都有一个对应的队列索引。

RabbitMQ 也提供了过期时间(TTL)机制,用来删除集群中没用的消息。它支持单条消息和队列两个维度来设置数据过期时间。如果在队列上设置 TTL,那么队列中的所有消息都有相同的过期时间。我们也可以对单条消息单独设置 TTL,每条消息的 TTL 可以不同。如果两种方案一起使用,那么消息的 TTL 就会以两个值中最小的那个为准。如果不设置 TTL,则表示此消息不会过期。

删除消息时,不会立即删除数据,只是从 Erlang 中的 ETS 表删除指定消息的相关信息,同时更新消息对应的存储文件的相关信息。此时文件中的消息不会立即被删除,会被标记为已删除数据,直到一个文件中都是可以删除的数据时,再将这个文件删除,这个动作就是常说的延时删除。另外内核有检测机制,会检查前后两个文件中的数据是否可以合并,当符合合并规则时,会进行段文件的合并。

在了解了 RabbitMQ 的协议、网络模块和数据存储后,我们再来看一下 RabbitMQ 的生产者和消费者的实现。


生产者和消费者

当生产者和消费者连接到 Broker 进行生产消费的时候,是直接和 Broker 交互的,不需要客户端寻址。客户端连接 Broker 的方式,跟我们通过 HTTP 服务访问 Server 是一样的,都是直连的。部署架构如下图所示:

abbitMQ 集群部署后,为了提高容灾能力,就需要在集群前面挂一层负载均衡来进行灾备。客户端拿到负载均衡 IP 后,在生产或消费时使用这个 IP 和服务端直接建立连接。因为 Queue 是具体存储数据的单元,不同的 Queue 有可能分布在不同的 Broker 上,就有可能出现生产或消费基于负载均衡 IP 请求到的 Broker,并不是当前 Queue 所在的 Broker,从而导致生产消费失败。

为了解决这个问题,在每个 Broker 上会设置有转发的功能。在实现上,每台 Broker 节点都会保存集群所有的元数据信息。当 Broker 收到请求后,根据本地缓存的元数据信息判断 Queue 是否在本机上,如果不在本机,就会将请求转发到 Queue 所在的目标节点。

从客户端的实现来看,因为各个语言的实现机制不太一样,基础模块的连接管理、心跳管理、序列化等部分遵循各编程语言的开发规范去实现。例如网络模块的实现,如果客户端是用 Java 语言写的,那么可以使用 Java NIO 库完成网络模块的开发。客户端和服务端传输协议的内容遵循 AMQP 协议,底层以二进制流的形式序列化数据。即根据 AMQP 协议的格式构建内容后,然后序列化为二进制的格式,传递给 Broker 进行处理。

生产端发送数据不是直接发送到 Queue,而是直接发送到 Exchange。即发送时需要指定 Exchange 和 route_key,服务端会根据这两个信息,将消息数据分发到具体的 Queue。因为 Exchange 和 route_key 都是一个逻辑概念,数据是直接发送到 Broker 的,然后在服务端根据路由绑定规则,将数据分发到不同的 Queue 中,所以在客户端是没有发送生产分区分配策略的逻辑。其实从某种程度来看,Exchagne 和 Route 的功能就是生产分区分配的过程,只是将这个逻辑从客户端移动到了服务端而已

在消费端,RabbitMQ 支持 Push(推)和 Pull(拉)两种模式,如果

  • 使用了 Push 模式,Broker 会不断地推送消息给消费者。不需要客户端主动来拉,只要服务端有消息就会将数据推给客户端。当然推送消息的个数会受到 channel.basicQos 的限制,不能无限推送,在消费端会设置一个缓冲区来缓冲这些消息。
  • 拉模式是指客户端不断地去服务端拉取消息,RabbitMQ 的拉模式只支持拉取单条消息。

在 AMQP 协议中,是没有定义 Topic 和消费分组的概念的,所以在消费端没有消费分区分配、消费分组 Rebalance 等操作,消费者是直接消费 Queue 数据的。

为了保证消费流程的可靠性,RabbitMQ 也提供了消息确认机制。消费者在消费到数据的时候,会调用 ACK 接口来确认数据是否被成功消费。

底层提供了自动 ACK 和手动 ACK 两种机制。

  • 自动 ACK 表示当客户端消费到数据后,消费者会自动发送 ACK,默认是自动 ACK。
  • 手动 ACK 表示客户端消费到数据后,需要手动调用。

ACK 的时候,支持单条 ACK 和批量 ACK 两种动作,批量 ACK 可以用来提升 ACK 效率。另外,为了提升 ACK 动作的性能,有些客户端也支持异步的 ACK。


在了解了上述的五个模块后,最后我们来看一下 RabbitMQ 对 HTTP 协议的支持和管控操作。

HTTP 协议支持和管控操作

RabbitMQ 内核本身不支持 HTTP 协议的生产、消费和集群管控等操作。如果需要支持,则需要先手动开启 Management 插件,通过插件的形式让内核支持这个功能。

大部分情况下,都会建议你启用 Management 插件,否则集群使用就会不太方便。如下图所示,从实现上来看 Management 插件对 HTTP 协议的支持,就是在开启插件的时候,会启动一个新的 HTTP Server 来监听一个新的端口。

客户端只需要访问这个端口提供的 HTTP 接口,就可以完成 HTTP 读写数据和一些集群管控的操作。如果你想了解更多细节,可以查看这个文档 Management Plugin

开启插件后,就可以通过 HTTP 接口实现生产、消费、集群的配置、资源的创建、删除等操作。比如下面是一个查看 Vhost 列表的 curl 命令示例:

curl -i --header "authorization: Bearer <token>" http://localhost:15672/api/vhosts     

RabbitMQ 从生产到消费的全过程

跟经典的消息队列一样,RabbitMQ 的生产到消费总共经过生产者、Broker、消费者三个模块。大致的流程如下:

在生产端,客户端根据 AMQP 协议定义的命令字(如 Connection.Start/Start-Ok、Connection.Tune/Tune-Ok),通过四层的 TCP 协议和 Broker 创建 Connection、Channel 进行通信。

客户端直连 Broker 服务,不需要经过寻址,然后客户端需要指定 Exchange、route_key 发送消息。因为 AMQP 没有支持批量发送的协议,消息会立即发送给给服务端。通信协议的内容格式、序列化和反序列化遵循 AMQP 的标准。

Broker 收到消息后,根据 AMQP 协议反序列化解析出请求内容。根据 Exchange 和 route_key 的信息,结合路由模式,将数据分发到具体的 Queue 中。存储层收到消息后,底层会将这条数据的结构进行整合,添加一些额外信息,如写入时间等,然后将数据写入到同一个文件存储。Broker 支持数据过期机制,当消息过期后,数据会被删除。

消费端直接指定 Queue 消费,不需要经过消费分组、分区分配的过程。消费端跟生产端一样,根据 AMQP 协议连接上 Broker 后,消费端直接从 Queue 中消费数据,消费完成后通过手动 ACK 或自动 ACK 的方式 ACK 消息。


总结

RabbitMQ 主要有 Producer、Broker、Consumer、Exchange、Queue、Route、Bind、Connection、Channel、ACK 等概念。

总结 RabbitMQ,可以从以下七个方面入手:

  • 协议层基于 AMQP 标准开发。
  • 网络层核心数据流基于 TCP 协议通信,并通过 Connection 和 Channel 机制实现连接的复用,以减少创建的 TCP 连接数量。
  • 存储层基于多个 Queue 数据统一到一个文件存储的思路设计,同时支持分段存储和基于时间的数据过期机制。
  • 元数据存储是基于 Erlang 内置的数据库 Mnesia 来实现。
  • 客户端的访问是直连的,没有客户端寻址机制。
  • 生产端是通过 Exchange 和 Route 写入数据的,生产数据的分发是在服务端完成的,其他消息队列的分发一般都是在客户端。
  • 消费端没有消费分组、消费分区分配等概念,直连 Queue 消费,同时也提供了手动和自动两种 ACK 机制。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
4月前
|
消息中间件 Java Kafka
消息传递新纪元:探索RabbitMQ、RocketMQ和Kafka的魅力所在
【8月更文挑战第29天】这段内容介绍了在分布式系统中起到异步通信与解耦作用的消息队列,并详细探讨了三种流行的消息队列产品:RabbitMQ、RocketMQ 和 Kafka。其中,RabbitMQ 是一个基于 AMQP 协议的开源消息队列系统,支持多种消息模型;RocketMQ 则是由阿里巴巴开源的具备高性能、高可用性和高可靠性的分布式消息队列,支持事务消息等多种特性;而 Kafka 作为一个由 LinkedIn 开源的分布式流处理平台,以高吞吐量和良好的可扩展性著称。此外,还提供了使用这三种消息队列发送和接收消息的代码示例。总之,这三种消息队列各有优势,适用于不同的业务场景。
76 3
|
4月前
|
消息中间件 存储 Java
RabbitMQ 在微服务架构中的高级应用
【8月更文第28天】在微服务架构中,服务之间需要通过轻量级的通信机制进行交互。其中一种流行的解决方案是使用消息队列,如 RabbitMQ,来实现异步通信和解耦。本文将探讨如何利用 RabbitMQ 作为服务间通信的核心组件,并构建高效的事件驱动架构。
174 2
|
1月前
|
消息中间件 大数据 Kafka
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
本文深入探讨了消息队列的核心概念、应用场景及Kafka、RocketMQ、RabbitMQ的优劣势比较,大厂面试高频,必知必会,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
|
5月前
|
消息中间件 Java 测试技术
消息队列 MQ使用问题之数据流出规则是否支持平台的云RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
消息中间件 存储 监控
ActiveMQ、RocketMQ、RabbitMQ、Kafka 的区别
【10月更文挑战第24天】ActiveMQ、RocketMQ、RabbitMQ 和 Kafka 都有各自的特点和优势,在不同的应用场景中发挥着重要作用。在选择消息队列时,需要根据具体的需求、性能要求、扩展性要求等因素进行综合考虑,选择最适合的消息队列技术。同时,随着技术的不断发展和演进,这些消息队列也在不断地更新和完善,以适应不断变化的应用需求。
108 1
|
2月前
|
消息中间件 数据采集 数据库
小说爬虫-03 爬取章节的详细内容并保存 将章节URL推送至RabbitMQ Scrapy消费MQ 对数据进行爬取后写入SQLite
小说爬虫-03 爬取章节的详细内容并保存 将章节URL推送至RabbitMQ Scrapy消费MQ 对数据进行爬取后写入SQLite
35 1
|
3月前
|
消息中间件 弹性计算 运维
云消息队列RabbitMQ 版架构优化评测
云消息队列RabbitMQ 版架构优化评测
70 6
|
3月前
|
消息中间件 监控 物联网
MQTT协议对接及RabbitMQ的使用记录
通过合理对接MQTT协议并利用RabbitMQ的强大功能,可以构建一个高效、可靠的消息通信系统。无论是物联网设备间的通信还是微服务架构下的服务间消息传递,MQTT和RabbitMQ的组合都提供了一个强有力的解决方案。在实际应用中,应根据具体需求和环境进行适当的配置和优化,以发挥出这两个技术的最大效能。
208 0
|
4月前
|
消息中间件 存储 监控
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别,设计目标、适用场景、吞吐量、消息存储和持久化、可靠性、集群负载均衡
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别

热门文章

最新文章