导图
概述
最基础的消息队列应该具备通信协议、网络模块、存储模块、生产者、消费者五个模块。
接下来我们从消息和流的角度,分别看一下
- 消息方向的消息队列 RabbitMQ、RocketMQ
- 流方向的消息队列 Kafka、Pulsar
在这五个模块的实现思路和设计思想 。
今天先看看 RabbitMQ。
RabbitMQ 系统架构
我们先来看一下 RabbitMQ 的系统架构。
如上图所示,RabbitMQ 由 Producer、Broker、Consumer 三个大模块组成。生产者将数据发送到 Broker,Broker 接收到数据后,将数据存储到对应的 Queue 里面,消费者从不同的 Queue 消费数据。
那么除了 Producer、Broker、Queue、Consumer、ACK 这几个消息队列的基本概念外,它还有 Exchange、Bind、Route 这几个独有的概念。下面来简单解释下。
Exchange 称为交换器,它是一个逻辑上的概念,用来做分发,本身不存储数据。流程上生产者先将消息发送到 Exchange,而不是发送到数据的实际存储单元 Queue 里面。
然后 Exchange 会根据一定的规则将数据分发到实际的 Queue 里面存储。这个分发过程就是 Route(路由),设置路由规则的过程就是 Bind(绑定)。
即 Exchange 会接收客户端发送过来的 route_key,然后根据不同的路由规则,将数据发送到不同的 Queue 里面。
这里需要注意的是,在 RabbitMQ 中是没有 Topic 这个用来组织分区的逻辑概念的。RabbitMQ 中的 Topic 是指 Topic 路由模式,是一种路由模式,和消息队列中的 Topic 意义是完全不同的。
那为什么 RabbitMQ 会有 Exchange、Bind、Route 这些独有的概念呢?
在我看来,主要和当时业界的架构设计思想以及主导设计 AMQP 协议的公司背景有关。当时的设计思路是:希望发消息跟写信的流程一样,可以有一个集中的分发点(邮局),通过填写好地址信息,最终将信投递到目的地。这个集中分发点(邮局)就是 Exchange,地址信息就是 Route,填写地址信息的操作就是 Bind,目的地是 Queue。
讲清楚基本概念和架构,我们就围绕着前面提到的五个模块来分析一下 RabbitMQ,先来看一下协议和网络模块。
协议和网络模块
在网络通信协议层面,RabbitMQ 数据流是基于四层 TCP 协议通信的,跑在 TCP 上的应用层协议是 AMQP。
如果开启 Management 插件,也可以支持 HTTP 协议的生产和消费。TCP + AMQP 是数据流的默认访问方式,也是官方推荐的使用方式,因为它性能会比 HTTP 高很多。
RabbitMQ 在协议内容和连接管理方面,都是遵循 AMQP 规范。即 RabbitMQ 的模型架构和 AMQP 的模型架构是一样的,交换器、交换器类型、队列、绑定、路由键等都是遵循 AMQP 协议中相应的概念。
AMQP 是一个应用层的通信协议,可以看作一系列结构化命令的集合,用来填充 TCP 层协议的 body 部分。通过协议命令进行交互,可以完成各种消息队列的基本操作,如 Connection.Start(建立连接)、Basic.Publish(发送消息)等等,详细的 AMQP 协议内容可以参考文档 AMQP Working Group 1.0 Final
下面是一张生产消息流程的协议命令交互图,大概包含了建立连接、发送消息、关闭连接三个步骤。
讲完了协议,我们来看看网络模块。
先来看下面这张图,在 RabbitMQ 的网络层有 Connectoion 和 Channel 两个概念需要关注。
Connection 是指 TCP 连接,Channel 是 Connection 中的虚拟连接。两者的关系是:
- 一个客户端和一个 Broker 之间只会建立一条 TCP 连接,就是指 Connection。
- Channel(虚拟连接)的概念在这个连接中定义,一个 Connection 中可以创建多个 Channel。
客户端和服务端的实际通信都是在 Channel 维度通信的。这个机制可以减少实际的 TCP 连接数量,从而降低网络模块的损耗。从设计角度看,也是基于 IO 复用、异步 I/O 的思路来设计的。
从编码实现的角度,RabbitMQ 的网络模块设计会比较简单。主要包含 tcp_listener、tcp_acceptor、rabbit_reader 三个进程。如下图所示,RabbitMQ 服务端通过
- tcp_listener 监听端口,
- tcp_acceptor 接收请求,
- rabbit_reader 处理和返回请求。
本质上来看是也是一个多线程的网络模型。
数据存储
接下来我们看看 RabbitMQ 的存储模块。
RabbitMQ 的存储模块也包含元数据存储与消息数据存储两部分。如下图所示,RabbitMQ 的两类数据都是存储在 Broker 节点上的,不会依赖第三方存储引擎。
我们先来看一下元数据存储。
元数据存储 —> 自带的分布式数据库 Mnesia
RabbitMQ 的元数据都是存在于 Erlang 自带的分布式数据库 Mnesia 中的。即每台 Broker 都会起一个 Mnesia 进程,用来保存一份完整的元数据信息。因为 Mnesia 本身是一个分布式的数据库,自带了多节点的 Mnesia 数据库之间的同步机制。所以在元数据的存储模块,RabbitMQ 的 Broker 只需要调用本地的 Mnesia 接口保存、变更数据即可。不同节点的元数据同步 Mnesia 会自动完成。
Mnesia 对 RabbitMQ 的作用,相当于 ZooKeeper 对于 Kafka、NameServer 对于 RocketMQ 的作用。因为 Mnesia 是内置在 Broker 中,所以部署 RabbitMQ 集群时,你会发现只需要部署 Broker,不需要部署其他的组件。这种部署结构就很简单清晰,从而也降低了后续的运维运营成本。
在一些异常的情况下,如果不同节点上的 Mnesia 之间的数据同步出现问题,就会导致不同的 Mnesia 数据库之间数据不一致,进而导致集群出现脑裂、无法启动等情况。此时就需要手动修复异常的 Mnesia 实例上的数据。
因为 Mnesia 本身是一个数据库,所以它和数据库一样,可以进行增删改查的操作。需要了解 Mnesia 的更多操作,你可以参考 ErLang Mnesia
消息数据存储
如下图所示,RabbitMQ 消息数据的最小存储单元是 Queue,即消息数据是按顺序写入存储到 Queue 里面的。在底层的数据存储方面,所有的 Queue 数据是存储在同一个“文件”里面的。这个“文件”是一个虚拟的概念,表示所有的 Queue 数据是存储在一起的意思。
这个“文件”由队列索引(rabbit_queue_index)和消息存储(rabbitmq_msg_store)两部分组成。即在节点维度,所有 Queue 数据都是存储在 rabbit_msg_store 里面的,每个节点上只有一个 rabbit_msg_store,数据会依次顺序写入到 rabbit_msg_store 中。
rabbit_msg_store 是一个逻辑概念,底层的实际存储单元分为两个,msg_store_persistent 和 msg_store_transient,分别负责持久化消息和非持久化消息的存储。
msg_store_persistent 和 msg_store_transient 在操作系统上是以文件夹的形式表示的,具体的数据存储是以不同的文件段的形式存储在目录中,所有消息都会以追加的形式写入到文件中。
当一个文件的大小超过了配置的单个文件的最大值,就会关闭这个文件,然后再创建一个文件来存储数据。关于 RabbitMQ 底层的数据存储结构,如下图所示:
队列索引负责存储、维护队列中落盘消息的信息,包括消息的存储位置、是否交付、是否 ACK 等等信息。队列索引是 Queue 维度的,每个 Queue 都有一个对应的队列索引。
RabbitMQ 也提供了过期时间(TTL)机制,用来删除集群中没用的消息。它支持单条消息和队列两个维度来设置数据过期时间。如果在队列上设置 TTL,那么队列中的所有消息都有相同的过期时间。我们也可以对单条消息单独设置 TTL,每条消息的 TTL 可以不同。如果两种方案一起使用,那么消息的 TTL 就会以两个值中最小的那个为准。如果不设置 TTL,则表示此消息不会过期。
删除消息时,不会立即删除数据,只是从 Erlang 中的 ETS 表删除指定消息的相关信息,同时更新消息对应的存储文件的相关信息。此时文件中的消息不会立即被删除,会被标记为已删除数据,直到一个文件中都是可以删除的数据时,再将这个文件删除,这个动作就是常说的延时删除。另外内核有检测机制,会检查前后两个文件中的数据是否可以合并,当符合合并规则时,会进行段文件的合并。
在了解了 RabbitMQ 的协议、网络模块和数据存储后,我们再来看一下 RabbitMQ 的生产者和消费者的实现。
生产者和消费者
当生产者和消费者连接到 Broker 进行生产消费的时候,是直接和 Broker 交互的,不需要客户端寻址。客户端连接 Broker 的方式,跟我们通过 HTTP 服务访问 Server 是一样的,都是直连的。部署架构如下图所示:
abbitMQ 集群部署后,为了提高容灾能力,就需要在集群前面挂一层负载均衡来进行灾备。客户端拿到负载均衡 IP 后,在生产或消费时使用这个 IP 和服务端直接建立连接。因为 Queue 是具体存储数据的单元,不同的 Queue 有可能分布在不同的 Broker 上,就有可能出现生产或消费基于负载均衡 IP 请求到的 Broker,并不是当前 Queue 所在的 Broker,从而导致生产消费失败。
为了解决这个问题,在每个 Broker 上会设置有转发的功能。在实现上,每台 Broker 节点都会保存集群所有的元数据信息。当 Broker 收到请求后,根据本地缓存的元数据信息判断 Queue 是否在本机上,如果不在本机,就会将请求转发到 Queue 所在的目标节点。
从客户端的实现来看,因为各个语言的实现机制不太一样,基础模块的连接管理、心跳管理、序列化等部分遵循各编程语言的开发规范去实现。例如网络模块的实现,如果客户端是用 Java 语言写的,那么可以使用 Java NIO 库完成网络模块的开发。客户端和服务端传输协议的内容遵循 AMQP 协议,底层以二进制流的形式序列化数据。即根据 AMQP 协议的格式构建内容后,然后序列化为二进制的格式,传递给 Broker 进行处理。
生产端发送数据不是直接发送到 Queue,而是直接发送到 Exchange。即发送时需要指定 Exchange 和 route_key,服务端会根据这两个信息,将消息数据分发到具体的 Queue。因为 Exchange 和 route_key 都是一个逻辑概念,数据是直接发送到 Broker 的,然后在服务端根据路由绑定规则,将数据分发到不同的 Queue 中,所以在客户端是没有发送生产分区分配策略的逻辑。其实从某种程度来看,Exchagne 和 Route 的功能就是生产分区分配的过程,只是将这个逻辑从客户端移动到了服务端而已。
在消费端,RabbitMQ 支持 Push(推)和 Pull(拉)两种模式,如果
- 使用了 Push 模式,Broker 会不断地推送消息给消费者。不需要客户端主动来拉,只要服务端有消息就会将数据推给客户端。当然推送消息的个数会受到 channel.basicQos 的限制,不能无限推送,在消费端会设置一个缓冲区来缓冲这些消息。
- 拉模式是指客户端不断地去服务端拉取消息,RabbitMQ 的拉模式只支持拉取单条消息。
在 AMQP 协议中,是没有定义 Topic 和消费分组的概念的,所以在消费端没有消费分区分配、消费分组 Rebalance 等操作,消费者是直接消费 Queue 数据的。
为了保证消费流程的可靠性,RabbitMQ 也提供了消息确认机制。消费者在消费到数据的时候,会调用 ACK 接口来确认数据是否被成功消费。
底层提供了自动 ACK 和手动 ACK 两种机制。
- 自动 ACK 表示当客户端消费到数据后,消费者会自动发送 ACK,默认是自动 ACK。
- 手动 ACK 表示客户端消费到数据后,需要手动调用。
ACK 的时候,支持单条 ACK 和批量 ACK 两种动作,批量 ACK 可以用来提升 ACK 效率。另外,为了提升 ACK 动作的性能,有些客户端也支持异步的 ACK。
在了解了上述的五个模块后,最后我们来看一下 RabbitMQ 对 HTTP 协议的支持和管控操作。
HTTP 协议支持和管控操作
RabbitMQ 内核本身不支持 HTTP 协议的生产、消费和集群管控等操作。如果需要支持,则需要先手动开启 Management 插件,通过插件的形式让内核支持这个功能。
大部分情况下,都会建议你启用 Management 插件,否则集群使用就会不太方便。如下图所示,从实现上来看 Management 插件对 HTTP 协议的支持,就是在开启插件的时候,会启动一个新的 HTTP Server 来监听一个新的端口。
客户端只需要访问这个端口提供的 HTTP 接口,就可以完成 HTTP 读写数据和一些集群管控的操作。如果你想了解更多细节,可以查看这个文档 Management Plugin。
开启插件后,就可以通过 HTTP 接口实现生产、消费、集群的配置、资源的创建、删除等操作。比如下面是一个查看 Vhost 列表的 curl 命令示例:
curl -i --header "authorization: Bearer <token>" http://localhost:15672/api/vhosts
RabbitMQ 从生产到消费的全过程
跟经典的消息队列一样,RabbitMQ 的生产到消费总共经过生产者、Broker、消费者三个模块。大致的流程如下:
在生产端,客户端根据 AMQP 协议定义的命令字(如 Connection.Start/Start-Ok、Connection.Tune/Tune-Ok),通过四层的 TCP 协议和 Broker 创建 Connection、Channel 进行通信。
客户端直连 Broker 服务,不需要经过寻址,然后客户端需要指定 Exchange、route_key 发送消息。因为 AMQP 没有支持批量发送的协议,消息会立即发送给给服务端。通信协议的内容格式、序列化和反序列化遵循 AMQP 的标准。
Broker 收到消息后,根据 AMQP 协议反序列化解析出请求内容。根据 Exchange 和 route_key 的信息,结合路由模式,将数据分发到具体的 Queue 中。存储层收到消息后,底层会将这条数据的结构进行整合,添加一些额外信息,如写入时间等,然后将数据写入到同一个文件存储。Broker 支持数据过期机制,当消息过期后,数据会被删除。
消费端直接指定 Queue 消费,不需要经过消费分组、分区分配的过程。消费端跟生产端一样,根据 AMQP 协议连接上 Broker 后,消费端直接从 Queue 中消费数据,消费完成后通过手动 ACK 或自动 ACK 的方式 ACK 消息。
总结
RabbitMQ 主要有 Producer、Broker、Consumer、Exchange、Queue、Route、Bind、Connection、Channel、ACK
等概念。
总结 RabbitMQ,可以从以下七个方面入手:
- 协议层基于 AMQP 标准开发。
- 网络层核心数据流基于 TCP 协议通信,并通过 Connection 和 Channel 机制实现连接的复用,以减少创建的 TCP 连接数量。
- 存储层基于多个 Queue 数据统一到一个文件存储的思路设计,同时支持分段存储和基于时间的数据过期机制。
- 元数据存储是基于 Erlang 内置的数据库 Mnesia 来实现。
- 客户端的访问是直连的,没有客户端寻址机制。
- 生产端是通过 Exchange 和 Route 写入数据的,生产数据的分发是在服务端完成的,其他消息队列的分发一般都是在客户端。
- 消费端没有消费分组、消费分区分配等概念,直连 Queue 消费,同时也提供了手动和自动两种 ACK 机制。